Skip to content

スケジューラ

これまでに何度か「スケジューラ」という言葉が出てきました. このページではそのスケジューラについて詳しく見てきます.

スケジューラとは

スケジューラはあるタスクをスケジューリングして実行するためのものです.
時には実行タイミングの調整であったり,時にはキューイングであったり.

OS なんかにも,プロセスをスケジューリングするスケジューラがあったりします.

Vue.js に関してもさまざまな作用をスケジュールする機構があります.
これは vuejs/core-vapor (runtime-vapor) に限らず,vuejs/core (runtime-core) から元々あるコンセプトです.
例えば,皆さんがよく知る nextTick というのはこのスケジューラの API です.

https://vuejs.org/api/general.html#nexttick

他にも,watchwatchEffect などのウォッチャにオプションとして設定できる flush の項目もスケジュールの実行に関するものです.

https://vuejs.org/api/reactivity-core.html#watch

44export interface WatchEffectOptions extends DebuggerOptions {
45  flush?: 'pre' | 'post' | 'sync'
46}

スケジューラ API の概要

詳細な実装を見ていく前に,実際にスケジューラをどのように使うのかという部分を見ていきます.
これは Vue.js が内部的にどう使うか,という話で, Vue.js のユーザーが直接使う API ではありません.

スケジューラの実装は packages/runtime-vapor/src/scheduler.ts にあります.

まず,基本的な構造として queuejob があります.
そして,queue には 2 種類のもがあります.
queuependingPostFlushCbs です.

22const queue: SchedulerJob[] = []
23let flushIndex = 0
28const pendingPostFlushCbs: SchedulerJob[] = []
29let activePostFlushCbs: SchedulerJob[] | null = null
30let postFlushIndex = 0

ここでは実際にキューされた job と,現在実行中の index を管理しています.

job は実際の実行対象です. Function に idflag (後述) を生やしたものです.

23export interface SchedulerJob extends Function {
24  id?: number
25  /**
26   * flags can technically be undefined, but it can still be used in bitwise
27   * operations just like 0.
28   */
29  flags?: SchedulerJobFlags
30}

次にこれらを操作する関数についてです.

queuejob を追加する queueJob と,queue にある job を実行する queueFlush, flushJobs があります.
(flushJobsqueueFlush から呼ばれます.)
そして,flushJobs では queue のジョブを実行した後に,pendingPostFlushCbs にあるジョブも実行します.

35export function queueJob(job: SchedulerJob): void {
110function flushJobs() {

また,flushPostFlushCbsjob を追加する queuePostFlushCb と,pendingPostFlushCbs にある job を実行する flushPostFlushCbs があります.
(前述の通りflushPostFlushCbsflushJobs からも呼ばれています.)

57export function queuePostFlushCb(cb: SchedulerJobs): void {
81export function flushPostFlushCbs(): void {

そしてこれらのジョブの実行 (flushJobs) は Promise でラップされており (Promise.resolve().then(flushJobs) のイメージ),現在のジョブ実行 (Promise) は currentFlushPromise として管理されています.
そしてこの currentFlushPromise に then に繋ぐ形でタスクのスケジューリングを行なっています.

そして,皆さんのよく知る nextTick はこの currentFlushPromisethen に cb を登録するだけの関数です.

144export function nextTick<T = void, R = void>(
145  this: T,
146  fn?: (this: T) => R,
147): Promise<Awaited<R>> {
148  const p = currentFlushPromise || resolvedPromise
149  return fn ? p.then(this ? fn.bind(this) : fn) : p
150}

どこで使われている?

実際に queue を操作している実装がどこにあるかを見てもます.

queueJob

Vapor で現状使われているのは 3 箇所です.

161      effect.scheduler = () => queueJob(triggerRenderingUpdate)
65  effect.scheduler = () => {
66    isTriggered = true
67    queueJob(triggerRenderingUpdate)
68  }
41  effect.scheduler = () => queueJob(job)

いずれに共通しているのは,effect.scheduler に設定しているというところです.
これらが何なのかについては少し先を読んでからにしましょう.

queueFlush

queueFlush に反しては scheduler の実装の内部でしか扱われていません.
どのタイミングでこれらが実行されるかは実装の詳細を見る時に見ていきましょう.

queuePostFlushCb

こちらは何箇所か使われています.

165  invokeLifecycle(
166    instance,
167    VaporLifecycleHooks.UNMOUNTED,
168    'unmounted',
169    instance => queuePostFlushCb(() => (instance.isUnmounted = true)),
170    true,
171  )
16  function invokeCurrent() {
17    cb && cb(instance)
18    const hooks = instance[lifecycle]
19    if (hooks) {
20      const fn = () => {
21        const reset = setCurrentInstance(instance)
22        instance.scope.run(() => invokeArrayFns(hooks))
23        reset()
24      }
25      post ? queuePostFlushCb(fn) : fn()
26    }
27
28    invokeDirectiveHook(instance, directive, instance.scope)
29  }
251      queuePostFlushCb(() => {
252        instance.isUpdating = false
253        const reset = setCurrentInstance(instance)
254        if (dirs) {
255          invokeDirectiveHook(instance, 'updated', scope)
256        }
257        // updated hook
258        if (u) {
259          queuePostFlushCb(u)
260        }
261        reset()
262      })
137  if (!dirs) {
138    const res = handler && handler()
139    if (name === 'mount') {
140      queuePostFlushCb(() => (scope.im = true))
141    }
142    return res
143  }
144
145  invokeDirectiveHook(instance, before, scope)
146  try {
147    if (handler) {
148      return handler()
149    }
150  } finally {
151    queuePostFlushCb(() => {
152      invokeDirectiveHook(instance, after, scope)
153    })
154  }
74      queuePostFlushCb(() => {
75        instance.isUpdating = false
76        const reset = setCurrentInstance(instance)
77        if (dirs) {
78          invokeDirectiveHook(instance, 'updated', scope)
79        }
80        // updated hook
81        if (u) {
82          queuePostFlushCb(u)
83        }
84        reset()
85      })

上記の 5 に共通することは,何らかしらのライフサイクルフックであることです. そのフックに登録されたコールバック関数の実行を pendingPostFlushCbs に追加しておくようです.

updated や mounted, unmounted などのライフサイクルはその場で実行してしまうとまだ DOM にその内容が反映されていないことがあると思います.
スケジューラによって Promise (イベントループ) をコントロールすることで実行タイミングを制御していそうです. 詳しくはまた実装の詳細と合わせて読んでみましょう.

29export function on(
30  el: Element,
31  event: string,
32  handlerGetter: () => undefined | ((...args: any[]) => any),
33  options: AddEventListenerOptions &
34    ModifierOptions & { effect?: boolean } = {},
35): void {
36  const handler: DelegatedHandler = eventHandler(handlerGetter, options)
37  let cleanupEvent: (() => void) | undefined
38  queuePostFlushCb(() => {
39    cleanupEvent = addEventListener(el, event, handler, options)
40  })
117      queuePostFlushCb(doSet)
118
119      onScopeDispose(() => {
120        queuePostFlushCb(() => {
121          if (isArray(existing)) {
122            remove(existing, refValue)
123          } else if (_isString) {
124            refs[ref] = null
125            if (hasOwn(setupState, ref)) {
126              setupState[ref] = null
127            }
128          } else if (_isRef) {
129            ref.value = null
130          }
131        })
132      })

上記の 2 つに関してはそもそもまだ event や templateRef が登場していないので一旦スルーしましょう.

flushPostFlushCbs

こちらは主に apiRender.ts で登場します. この本の t ランタイムの解説でも登場しましたね.

コンポーネントをマウントした後で flush しているようです.

106export function render(
107  instance: ComponentInternalInstance,
108  container: string | ParentNode,
109): void {
110  mountComponent(instance, (container = normalizeContainer(container)))
111  flushPostFlushCbs()
112}

アンマウント時も同様です.

155export function unmountComponent(instance: ComponentInternalInstance): void {
156  const { container, block, scope } = instance
157
158  // hook: beforeUnmount
159  invokeLifecycle(instance, VaporLifecycleHooks.BEFORE_UNMOUNT, 'beforeUnmount')
160
161  scope.stop()
162  block && remove(block, container)
163
164  // hook: unmounted
165  invokeLifecycle(
166    instance,
167    VaporLifecycleHooks.UNMOUNTED,
168    'unmounted',
169    instance => queuePostFlushCb(() => (instance.isUnmounted = true)),
170    true,
171  )
172  flushPostFlushCbs()
173}

実装の詳細

さて,続いてはこれら 4 つの関数の実装を見ていきましょう.

queueJob

まずは queueJob です.

35export function queueJob(job: SchedulerJob): void {
36  let lastOne: SchedulerJob | undefined
37  if (!(job.flags! & SchedulerJobFlags.QUEUED)) {
38    if (job.id == null) {
39      queue.push(job)
40    } else if (
41      // fast path when the job id is larger than the tail
42      !(job.flags! & SchedulerJobFlags.PRE) &&
43      job.id >= (((lastOne = queue[queue.length - 1]) && lastOne.id) || 0)
44    ) {
45      queue.push(job)
46    } else {
47      queue.splice(findInsertionIndex(job.id), 0, job)
48    }
49
50    if (!(job.flags! & SchedulerJobFlags.ALLOW_RECURSE)) {
51      job.flags! |= SchedulerJobFlags.QUEUED
52    }
53    queueFlush()
54  }
55}

引数で渡された jobflags をみて,すでにキューに追加されているかどうかを判定しています.
されている場合は無視します.

そして,jobid が設定されていない場合は無条件で queue に追加します.
なぜならもうこれは重複排除等の制御が不可能だからです (識別できないので).

それからは,flagsPRE 出ない場合は末尾に追加し,そうでない場合は然るべき index に挿入します.
その index は findInsertionIndexid を元に探します.

152// #2768
153// Use binary-search to find a suitable position in the queue,
154// so that the queue maintains the increasing order of job's id,
155// which can prevent the job from being skipped and also can avoid repeated patching.
156function findInsertionIndex(id: number) {
157  // the start index should be `flushIndex + 1`
158  let start = flushIndex + 1
159  let end = queue.length
160
161  while (start < end) {
162    const middle = (start + end) >>> 1
163    const middleJob = queue[middle]
164    const middleJobId = getId(middleJob)
165    if (
166      middleJobId < id ||
167      (middleJobId === id && middleJob.flags! & SchedulerJobFlags.PRE)
168    ) {
169      start = middle + 1
170    } else {
171      end = middle
172    }
173  }
174
175  return start
176}

キューは id が増加する順が守られているという仕様なので,二分探索によって高速に位置を特定します.

そこまで終わったら flagsQUEUED に設定しておしまいです.
ここでポイントとなるのは,最後に queueFlush() している点です.

続いて queueFlush を見ていきましょう.

queueFlush -> flushJobs

74function queueFlush() {
75  if (!isFlushing && !isFlushPending) {
76    isFlushPending = true
77    currentFlushPromise = resolvedPromise.then(flushJobs)
78  }
79}

queueFlush は resolvedPromise.then(flushJobs) を呼び出しているだけです.
この時,flushJobsresolvedPromise.then でラップし,その Promise を currentFlushPromise に設定しておきます.

flushJobs をみてみましょう.

110function flushJobs() {
111  isFlushPending = false
112  isFlushing = true
113
114  // Sort queue before flush.
115  // This ensures that:
116  // 1. Components are updated from parent to child. (because parent is always
117  //    created before the child so its render effect will have smaller
118  //    priority number)
119  // 2. If a component is unmounted during a parent component's update,
120  //    its update can be skipped.
121  queue.sort(comparator)
122
123  try {
124    for (let i = 0; i < queue!.length; i++) {
125      queue[i]()
126      queue[i].flags! &= ~SchedulerJobFlags.QUEUED
127    }
128  } finally {
129    flushIndex = 0
130    queue.length = 0
131
132    flushPostFlushCbs()
133
134    isFlushing = false
135    currentFlushPromise = null
136    // some postFlushCb queued jobs!
137    // keep flushing until it drains.
138    if (queue.length || pendingPostFlushCbs.length) {
139      flushJobs()
140    }
141  }
142}

まず queue は id によってソートされます.

121  queue.sort(comparator)
181const comparator = (a: SchedulerJob, b: SchedulerJob): number => {
182  const diff = getId(a) - getId(b)
183  if (diff === 0) {
184    const isAPre = a.flags! & SchedulerJobFlags.PRE
185    const isBPre = b.flags! & SchedulerJobFlags.PRE
186    if (isAPre && !isBPre) return -1
187    if (isBPre && !isAPre) return 1
188  }
189  return diff
190}

そうしたら,順に実行していきます.

123  try {
124    for (let i = 0; i < queue!.length; i++) {
125      queue[i]()
126      queue[i].flags! &= ~SchedulerJobFlags.QUEUED
127    }

finally で flushPostFlushCbs も実行してあげつつ,最後にまた queuependingPostFlushCbs をみて,まだ job が残っている場合は再度 flushJobs を再帰的に呼び出します.

128  } finally {
129    flushIndex = 0
130    queue.length = 0
131
132    flushPostFlushCbs()
133
134    isFlushing = false
135    currentFlushPromise = null
136    // some postFlushCb queued jobs!
137    // keep flushing until it drains.
138    if (queue.length || pendingPostFlushCbs.length) {
139      flushJobs()
140    }

queuePostFlushCb

こちらも,対象が pendingPostFlushCbs になっただけで,基本的な流れは queueJob と同じです.

57export function queuePostFlushCb(cb: SchedulerJobs): void {
58  if (!isArray(cb)) {
59    if (!(cb.flags! & SchedulerJobFlags.QUEUED)) {
60      pendingPostFlushCbs.push(cb)
61      if (!(cb.flags! & SchedulerJobFlags.ALLOW_RECURSE)) {
62        cb.flags! |= SchedulerJobFlags.QUEUED
63      }
64    }
65  } else {
66    // if cb is an array, it is a component lifecycle hook which can only be
67    // triggered by a job, which is already deduped in the main queue, so
68    // we can skip duplicate check here to improve perf
69    pendingPostFlushCbs.push(...cb)
70  }
71  queueFlush()
72}

キューイング後の flush に関しては queueFlush であることだけ覚えておけば大丈夫です. (queue も消化される)

flushPostFlushCbs

81export function flushPostFlushCbs(): void {
82  if (!pendingPostFlushCbs.length) return
83
84  const deduped = [...new Set(pendingPostFlushCbs)]
85  pendingPostFlushCbs.length = 0
86
87  // #1947 already has active queue, nested flushPostFlushCbs call
88  if (activePostFlushCbs) {
89    activePostFlushCbs.push(...deduped)
90    return
91  }
92
93  activePostFlushCbs = deduped
94
95  activePostFlushCbs.sort((a, b) => getId(a) - getId(b))
96
97  for (
98    postFlushIndex = 0;
99    postFlushIndex < activePostFlushCbs.length;
100    postFlushIndex++
101  ) {
102    activePostFlushCbs[postFlushIndex]()
103    activePostFlushCbs[postFlushIndex].flags! &= ~SchedulerJobFlags.QUEUED
104  }
105  activePostFlushCbs = null
106  postFlushIndex = 0
107}

こちらも new Set によって重複排除を行いつつ.id によってソートして順に pendingPostFlushCbs を実行しているだけです.

ReactiveEffect と Scheduler

さて,スケジューラに関してもう一箇所,把握しておかなければならないところがあります.

いずれに共通しているのは,effect.scheduler に設定しているというところです.

の部分です.

effect が持つ scheduler というオプションいたしして,処理をラップする形で queueJob を行なっていました.
果たしてこの effect.scheduler とは何なのでしょうか.

effectReactiveEffect のインスタンスです.

113export class ReactiveEffect<T = any>

実際に実行したい関数 (fn) を受け取り,インスタンスを生成します.

142  constructor(public fn: () => T) {

そして,ReactiveEffect を実行するためのメソッドは 2 つあります.
runtrigger です.

226  trigger(): void {

run の方は,

ts
const effect = new ReactiveEffect(() => console.log("effect"));
effect.run();

のように任意のタイミングで実行することができます.

renderEffect の初回実行時にもこの run によって実行されています.

37  const effect = new ReactiveEffect(() =>
38    callWithAsyncErrorHandling(cb, instance, VaporErrorCodes.RENDER_FUNCTION),
39  )
40
41  effect.scheduler = () => queueJob(job)
42  if (__DEV__ && instance) {
43    effect.onTrack = instance.rtc
44      ? e => invokeArrayFns(instance.rtc!, e)
45      : void 0
46    effect.onTrigger = instance.rtg
47      ? e => invokeArrayFns(instance.rtg!, e)
48      : void 0
49  }
50  effect.run()

一方で,trigger の方は基本的にはリアクティビティが形成されている時に使われます.
例えば,ref オブジェクトの set が走った時です.

134  set value(newValue) {

153        this.dep.trigger()

118  trigger(debugInfo?: DebuggerEventExtraInfo): void {
119    this.version++
120    globalVersion++
121    this.notify(debugInfo)
122  }

148      for (let link = this.subs; link; link = link.prevSub) {
149        link.sub.notify()
150      }

173      return this.trigger()

そして,この trigger 関数を見てみると,

226  trigger(): void {
227    if (this.flags & EffectFlags.PAUSED) {
228      pausedQueueEffects.add(this)
229    } else if (this.scheduler) {
230      this.scheduler()
231    } else {
232      this.runIfDirty()
233    }
234  }

scheduler を持っている場合にはそちらが優先的に実行されるようになっています.

これは,ある依存関係を元にリアクティブにエフェクトがトリガーされる際,余計な実行が起こらないようにするための仕組みです.
scheduler プロパティにはスケジューラにキューする処理を適切に設定し,作用の実行を最適化することができます.

例えば,renderEffect の実装を見てみましょう.
renderEffect では,scheduler に () => queueJob(job) を設定しています.

37  const effect = new ReactiveEffect(() =>
38    callWithAsyncErrorHandling(cb, instance, VaporErrorCodes.RENDER_FUNCTION),
39  )
40
41  effect.scheduler = () => queueJob(job)

そして,以下のように renderEffect を呼び出していたとします.

ts
const t0 = template("<p></p>");

const n0 = t0();
const count = ref(0);
const effect = () => setText(n0, count.value);
renderEffect(effect);

こうすると,counteffect (をラップした job) が track され,count が変更された際にその jobtrigger します.
trigger じには内部で設定された scheduler プロパティの方が実行されますが,今回は「job の実行」ではなく,あくまで「jobqueue に追加する」が設定されているため,即時には実行されずにスケジューラに渡されます.

ここで,このような trigger を考えてみましょう.

ts
const count = ref(0);
const effect = () => setText(n0, count.value);
renderEffect(effect);

count.value = 1; // enqueue job
count.value = 2; // enqueue job

このようにすると 2 回 () => queueJob(job) が実行されることになります.
そして,queueJob の実装を思い出して欲しいのですが,

37  if (!(job.flags! & SchedulerJobFlags.QUEUED)) {

すでにその job が追加されている場合は無視されるようになっています.
この関数の最後に queueFlush を実行しているので,毎回空っぽになりそうな気はしますが,実はこれは Promise で繋がっているため,この時点ではまだ flush されておらず,queue には job が残っている状態になります.

これにより,イベントループを媒介した job の重複排除を実現し,不要な実行を抑制することができます.
実際に,考えてみて欲しいのですが,

ts
count.value = 1;
count.value = 2;

と書いたところで,画面的には,2 回目の

ts
setText(n0, 2);

だけの実行で問題ないはずです.

これであらかたスケジューラの理解はできたはずです.
余計な作用の実行を制御するために Promise と queue を活用し,ライフサイクルフックの実行で画面の更新などを待った後で実行するのが正しいものは pendingPostFlushCbs という別のキューを用意し,実行タイミングを制御しています.