スケジューラ
これまでに何度か「スケジューラ」という言葉が出てきました. このページではそのスケジューラについて詳しく見てきます.
スケジューラとは
スケジューラはあるタスクをスケジューリングして実行するためのものです.
時には実行タイミングの調整であったり,時にはキューイングであったり.
OS なんかにも,プロセスをスケジューリングするスケジューラがあったりします.
Vue.js に関してもさまざまな作用をスケジュールする機構があります.
これは vuejs/core-vapor (runtime-vapor) に限らず,vuejs/core (runtime-core) から元々あるコンセプトです.
例えば,皆さんがよく知る nextTick
というのはこのスケジューラの API です.
https://vuejs.org/api/general.html#nexttick
他にも,watch
や watchEffect
などのウォッチャにオプションとして設定できる flush
の項目もスケジュールの実行に関するものです.
https://vuejs.org/api/reactivity-core.html#watch
44export interface WatchEffectOptions extends DebuggerOptions {
45 flush?: 'pre' | 'post' | 'sync'
46}
スケジューラ API の概要
詳細な実装を見ていく前に,実際にスケジューラをどのように使うのかという部分を見ていきます.
これは Vue.js が内部的にどう使うか,という話で, Vue.js のユーザーが直接使う API ではありません.
スケジューラの実装は packages/runtime-vapor/src/scheduler.ts にあります.
まず,基本的な構造として queue
と job
があります.
そして,queue には 2 種類のもがあります.queue
と pendingPostFlushCbs
です.
22const queue: SchedulerJob[] = []
23let flushIndex = 0
28const pendingPostFlushCbs: SchedulerJob[] = []
29let activePostFlushCbs: SchedulerJob[] | null = null
30let postFlushIndex = 0
ここでは実際にキューされた job と,現在実行中の index を管理しています.
job
は実際の実行対象です. Function に id
と flag
(後述) を生やしたものです.
23export interface SchedulerJob extends Function {
24 id?: number
25 /**
26 * flags can technically be undefined, but it can still be used in bitwise
27 * operations just like 0.
28 */
29 flags?: SchedulerJobFlags
30}
次にこれらを操作する関数についてです.
queue
に job
を追加する queueJob
と,queue
にある job
を実行する queueFlush
, flushJobs
があります.
(flushJobs
は queueFlush
から呼ばれます.)
そして,flushJobs
では queue のジョブを実行した後に,pendingPostFlushCbs
にあるジョブも実行します.
35export function queueJob(job: SchedulerJob): void {
74function queueFlush() {
110function flushJobs() {
また,flushPostFlushCbs
に job
を追加する queuePostFlushCb
と,pendingPostFlushCbs
にある job
を実行する flushPostFlushCbs
があります.
(前述の通りflushPostFlushCbs
は flushJobs
からも呼ばれています.)
57export function queuePostFlushCb(cb: SchedulerJobs): void {
81export function flushPostFlushCbs(): void {
そしてこれらのジョブの実行 (flushJobs) は Promise でラップされており (Promise.resolve().then(flushJobs)
のイメージ),現在のジョブ実行 (Promise) は currentFlushPromise
として管理されています.
そしてこの currentFlushPromise
に then に繋ぐ形でタスクのスケジューリングを行なっています.
そして,皆さんのよく知る nextTick
はこの currentFlushPromise
の then
に cb を登録するだけの関数です.
144export function nextTick<T = void, R = void>(
145 this: T,
146 fn?: (this: T) => R,
147): Promise<Awaited<R>> {
148 const p = currentFlushPromise || resolvedPromise
149 return fn ? p.then(this ? fn.bind(this) : fn) : p
150}
どこで使われている?
実際に queue を操作している実装がどこにあるかを見てもます.
queueJob
Vapor で現状使われているのは 3 箇所です.
161 effect.scheduler = () => queueJob(triggerRenderingUpdate)
65 effect.scheduler = () => {
66 isTriggered = true
67 queueJob(triggerRenderingUpdate)
68 }
41 effect.scheduler = () => queueJob(job)
いずれに共通しているのは,effect.scheduler
に設定しているというところです.
これらが何なのかについては少し先を読んでからにしましょう.
queueFlush
queueFlush に反しては scheduler の実装の内部でしか扱われていません.
どのタイミングでこれらが実行されるかは実装の詳細を見る時に見ていきましょう.
queuePostFlushCb
こちらは何箇所か使われています.
165 invokeLifecycle(
166 instance,
167 VaporLifecycleHooks.UNMOUNTED,
168 'unmounted',
169 instance => queuePostFlushCb(() => (instance.isUnmounted = true)),
170 true,
171 )
16 function invokeCurrent() {
17 cb && cb(instance)
18 const hooks = instance[lifecycle]
19 if (hooks) {
20 const fn = () => {
21 const reset = setCurrentInstance(instance)
22 instance.scope.run(() => invokeArrayFns(hooks))
23 reset()
24 }
25 post ? queuePostFlushCb(fn) : fn()
26 }
27
28 invokeDirectiveHook(instance, directive, instance.scope)
29 }
251 queuePostFlushCb(() => {
252 instance.isUpdating = false
253 const reset = setCurrentInstance(instance)
254 if (dirs) {
255 invokeDirectiveHook(instance, 'updated', scope)
256 }
257 // updated hook
258 if (u) {
259 queuePostFlushCb(u)
260 }
261 reset()
262 })
137 if (!dirs) {
138 const res = handler && handler()
139 if (name === 'mount') {
140 queuePostFlushCb(() => (scope.im = true))
141 }
142 return res
143 }
144
145 invokeDirectiveHook(instance, before, scope)
146 try {
147 if (handler) {
148 return handler()
149 }
150 } finally {
151 queuePostFlushCb(() => {
152 invokeDirectiveHook(instance, after, scope)
153 })
154 }
74 queuePostFlushCb(() => {
75 instance.isUpdating = false
76 const reset = setCurrentInstance(instance)
77 if (dirs) {
78 invokeDirectiveHook(instance, 'updated', scope)
79 }
80 // updated hook
81 if (u) {
82 queuePostFlushCb(u)
83 }
84 reset()
85 })
上記の 5 に共通することは,何らかしらのライフサイクルフックであることです. そのフックに登録されたコールバック関数の実行を pendingPostFlushCbs
に追加しておくようです.
updated や mounted, unmounted などのライフサイクルはその場で実行してしまうとまだ DOM にその内容が反映されていないことがあると思います.
スケジューラによって Promise (イベントループ) をコントロールすることで実行タイミングを制御していそうです. 詳しくはまた実装の詳細と合わせて読んでみましょう.
29export function on(
30 el: Element,
31 event: string,
32 handlerGetter: () => undefined | ((...args: any[]) => any),
33 options: AddEventListenerOptions &
34 ModifierOptions & { effect?: boolean } = {},
35): void {
36 const handler: DelegatedHandler = eventHandler(handlerGetter, options)
37 let cleanupEvent: (() => void) | undefined
38 queuePostFlushCb(() => {
39 cleanupEvent = addEventListener(el, event, handler, options)
40 })
117 queuePostFlushCb(doSet)
118
119 onScopeDispose(() => {
120 queuePostFlushCb(() => {
121 if (isArray(existing)) {
122 remove(existing, refValue)
123 } else if (_isString) {
124 refs[ref] = null
125 if (hasOwn(setupState, ref)) {
126 setupState[ref] = null
127 }
128 } else if (_isRef) {
129 ref.value = null
130 }
131 })
132 })
上記の 2 つに関してはそもそもまだ event や templateRef が登場していないので一旦スルーしましょう.
flushPostFlushCbs
こちらは主に apiRender.ts
で登場します. この本の t ランタイムの解説でも登場しましたね.
コンポーネントをマウントした後で flush しているようです.
106export function render(
107 instance: ComponentInternalInstance,
108 container: string | ParentNode,
109): void {
110 mountComponent(instance, (container = normalizeContainer(container)))
111 flushPostFlushCbs()
112}
アンマウント時も同様です.
155export function unmountComponent(instance: ComponentInternalInstance): void {
156 const { container, block, scope } = instance
157
158 // hook: beforeUnmount
159 invokeLifecycle(instance, VaporLifecycleHooks.BEFORE_UNMOUNT, 'beforeUnmount')
160
161 scope.stop()
162 block && remove(block, container)
163
164 // hook: unmounted
165 invokeLifecycle(
166 instance,
167 VaporLifecycleHooks.UNMOUNTED,
168 'unmounted',
169 instance => queuePostFlushCb(() => (instance.isUnmounted = true)),
170 true,
171 )
172 flushPostFlushCbs()
173}
実装の詳細
さて,続いてはこれら 4 つの関数の実装を見ていきましょう.
queueJob
まずは queueJob
です.
35export function queueJob(job: SchedulerJob): void {
36 let lastOne: SchedulerJob | undefined
37 if (!(job.flags! & SchedulerJobFlags.QUEUED)) {
38 if (job.id == null) {
39 queue.push(job)
40 } else if (
41 // fast path when the job id is larger than the tail
42 !(job.flags! & SchedulerJobFlags.PRE) &&
43 job.id >= (((lastOne = queue[queue.length - 1]) && lastOne.id) || 0)
44 ) {
45 queue.push(job)
46 } else {
47 queue.splice(findInsertionIndex(job.id), 0, job)
48 }
49
50 if (!(job.flags! & SchedulerJobFlags.ALLOW_RECURSE)) {
51 job.flags! |= SchedulerJobFlags.QUEUED
52 }
53 queueFlush()
54 }
55}
引数で渡された job
の flags
をみて,すでにキューに追加されているかどうかを判定しています.
されている場合は無視します.
そして,job
に id
が設定されていない場合は無条件で queue に追加します.
なぜならもうこれは重複排除等の制御が不可能だからです (識別できないので).
それからは,flags
が PRE
出ない場合は末尾に追加し,そうでない場合は然るべき index に挿入します.
その index は findInsertionIndex
で id
を元に探します.
152// #2768
153// Use binary-search to find a suitable position in the queue,
154// so that the queue maintains the increasing order of job's id,
155// which can prevent the job from being skipped and also can avoid repeated patching.
156function findInsertionIndex(id: number) {
157 // the start index should be `flushIndex + 1`
158 let start = flushIndex + 1
159 let end = queue.length
160
161 while (start < end) {
162 const middle = (start + end) >>> 1
163 const middleJob = queue[middle]
164 const middleJobId = getId(middleJob)
165 if (
166 middleJobId < id ||
167 (middleJobId === id && middleJob.flags! & SchedulerJobFlags.PRE)
168 ) {
169 start = middle + 1
170 } else {
171 end = middle
172 }
173 }
174
175 return start
176}
キューは id が増加する順が守られているという仕様なので,二分探索によって高速に位置を特定します.
そこまで終わったら flags
を QUEUED
に設定しておしまいです.
ここでポイントとなるのは,最後に queueFlush()
している点です.
続いて queueFlush
を見ていきましょう.
queueFlush -> flushJobs
74function queueFlush() {
75 if (!isFlushing && !isFlushPending) {
76 isFlushPending = true
77 currentFlushPromise = resolvedPromise.then(flushJobs)
78 }
79}
queueFlush は resolvedPromise.then(flushJobs) を呼び出しているだけです.
この時,flushJobs
は resolvedPromise.then
でラップし,その Promise を currentFlushPromise に設定しておきます.
flushJobs
をみてみましょう.
110function flushJobs() {
111 isFlushPending = false
112 isFlushing = true
113
114 // Sort queue before flush.
115 // This ensures that:
116 // 1. Components are updated from parent to child. (because parent is always
117 // created before the child so its render effect will have smaller
118 // priority number)
119 // 2. If a component is unmounted during a parent component's update,
120 // its update can be skipped.
121 queue.sort(comparator)
122
123 try {
124 for (let i = 0; i < queue!.length; i++) {
125 queue[i]()
126 queue[i].flags! &= ~SchedulerJobFlags.QUEUED
127 }
128 } finally {
129 flushIndex = 0
130 queue.length = 0
131
132 flushPostFlushCbs()
133
134 isFlushing = false
135 currentFlushPromise = null
136 // some postFlushCb queued jobs!
137 // keep flushing until it drains.
138 if (queue.length || pendingPostFlushCbs.length) {
139 flushJobs()
140 }
141 }
142}
まず queue は id によってソートされます.
121 queue.sort(comparator)
181const comparator = (a: SchedulerJob, b: SchedulerJob): number => {
182 const diff = getId(a) - getId(b)
183 if (diff === 0) {
184 const isAPre = a.flags! & SchedulerJobFlags.PRE
185 const isBPre = b.flags! & SchedulerJobFlags.PRE
186 if (isAPre && !isBPre) return -1
187 if (isBPre && !isAPre) return 1
188 }
189 return diff
190}
そうしたら,順に実行していきます.
123 try {
124 for (let i = 0; i < queue!.length; i++) {
125 queue[i]()
126 queue[i].flags! &= ~SchedulerJobFlags.QUEUED
127 }
finally で flushPostFlushCbs
も実行してあげつつ,最後にまた queue
と pendingPostFlushCbs
をみて,まだ job が残っている場合は再度 flushJobs
を再帰的に呼び出します.
128 } finally {
129 flushIndex = 0
130 queue.length = 0
131
132 flushPostFlushCbs()
133
134 isFlushing = false
135 currentFlushPromise = null
136 // some postFlushCb queued jobs!
137 // keep flushing until it drains.
138 if (queue.length || pendingPostFlushCbs.length) {
139 flushJobs()
140 }
queuePostFlushCb
こちらも,対象が pendingPostFlushCbs
になっただけで,基本的な流れは queueJob
と同じです.
57export function queuePostFlushCb(cb: SchedulerJobs): void {
58 if (!isArray(cb)) {
59 if (!(cb.flags! & SchedulerJobFlags.QUEUED)) {
60 pendingPostFlushCbs.push(cb)
61 if (!(cb.flags! & SchedulerJobFlags.ALLOW_RECURSE)) {
62 cb.flags! |= SchedulerJobFlags.QUEUED
63 }
64 }
65 } else {
66 // if cb is an array, it is a component lifecycle hook which can only be
67 // triggered by a job, which is already deduped in the main queue, so
68 // we can skip duplicate check here to improve perf
69 pendingPostFlushCbs.push(...cb)
70 }
71 queueFlush()
72}
キューイング後の flush に関しては queueFlush
であることだけ覚えておけば大丈夫です. (queue
も消化される)
71 queueFlush()
flushPostFlushCbs
81export function flushPostFlushCbs(): void {
82 if (!pendingPostFlushCbs.length) return
83
84 const deduped = [...new Set(pendingPostFlushCbs)]
85 pendingPostFlushCbs.length = 0
86
87 // #1947 already has active queue, nested flushPostFlushCbs call
88 if (activePostFlushCbs) {
89 activePostFlushCbs.push(...deduped)
90 return
91 }
92
93 activePostFlushCbs = deduped
94
95 activePostFlushCbs.sort((a, b) => getId(a) - getId(b))
96
97 for (
98 postFlushIndex = 0;
99 postFlushIndex < activePostFlushCbs.length;
100 postFlushIndex++
101 ) {
102 activePostFlushCbs[postFlushIndex]()
103 activePostFlushCbs[postFlushIndex].flags! &= ~SchedulerJobFlags.QUEUED
104 }
105 activePostFlushCbs = null
106 postFlushIndex = 0
107}
こちらも new Set
によって重複排除を行いつつ.id
によってソートして順に pendingPostFlushCbs を実行しているだけです.
ReactiveEffect と Scheduler
さて,スケジューラに関してもう一箇所,把握しておかなければならないところがあります.
いずれに共通しているのは,
effect.scheduler
に設定しているというところです.
の部分です.
effect が持つ scheduler というオプションいたしして,処理をラップする形で queueJob
を行なっていました.
果たしてこの effect.scheduler
とは何なのでしょうか.
effect
は ReactiveEffect
のインスタンスです.
113export class ReactiveEffect<T = any>
実際に実行したい関数 (fn) を受け取り,インスタンスを生成します.
142 constructor(public fn: () => T) {
そして,ReactiveEffect
を実行するためのメソッドは 2 つあります.run
と trigger
です.
182 run(): T {
226 trigger(): void {
run
の方は,
const effect = new ReactiveEffect(() => console.log("effect"));
effect.run();
のように任意のタイミングで実行することができます.
renderEffect の初回実行時にもこの run
によって実行されています.
37 const effect = new ReactiveEffect(() =>
38 callWithAsyncErrorHandling(cb, instance, VaporErrorCodes.RENDER_FUNCTION),
39 )
40
41 effect.scheduler = () => queueJob(job)
42 if (__DEV__ && instance) {
43 effect.onTrack = instance.rtc
44 ? e => invokeArrayFns(instance.rtc!, e)
45 : void 0
46 effect.onTrigger = instance.rtg
47 ? e => invokeArrayFns(instance.rtg!, e)
48 : void 0
49 }
50 effect.run()
一方で,trigger
の方は基本的にはリアクティビティが形成されている時に使われます.
例えば,ref オブジェクトの set が走った時です.
134 set value(newValue) {
↓
153 this.dep.trigger()
↓
118 trigger(debugInfo?: DebuggerEventExtraInfo): void {
119 this.version++
120 globalVersion++
121 this.notify(debugInfo)
122 }
↓
148 for (let link = this.subs; link; link = link.prevSub) {
149 link.sub.notify()
150 }
↓
173 return this.trigger()
そして,この trigger 関数を見てみると,
226 trigger(): void {
227 if (this.flags & EffectFlags.PAUSED) {
228 pausedQueueEffects.add(this)
229 } else if (this.scheduler) {
230 this.scheduler()
231 } else {
232 this.runIfDirty()
233 }
234 }
scheduler
を持っている場合にはそちらが優先的に実行されるようになっています.
これは,ある依存関係を元にリアクティブにエフェクトがトリガーされる際,余計な実行が起こらないようにするための仕組みです.scheduler
プロパティにはスケジューラにキューする処理を適切に設定し,作用の実行を最適化することができます.
例えば,renderEffect
の実装を見てみましょう.renderEffect
では,scheduler に () => queueJob(job)
を設定しています.
37 const effect = new ReactiveEffect(() =>
38 callWithAsyncErrorHandling(cb, instance, VaporErrorCodes.RENDER_FUNCTION),
39 )
40
41 effect.scheduler = () => queueJob(job)
そして,以下のように renderEffect を呼び出していたとします.
const t0 = template("<p></p>");
const n0 = t0();
const count = ref(0);
const effect = () => setText(n0, count.value);
renderEffect(effect);
こうすると,count
に effect
(をラップした job
) が track され,count が変更された際にその job
を trigger
します.trigger
じには内部で設定された scheduler
プロパティの方が実行されますが,今回は「job
の実行」ではなく,あくまで「job
を queue
に追加する」が設定されているため,即時には実行されずにスケジューラに渡されます.
ここで,このような trigger を考えてみましょう.
const count = ref(0);
const effect = () => setText(n0, count.value);
renderEffect(effect);
count.value = 1; // enqueue job
count.value = 2; // enqueue job
このようにすると 2 回 () => queueJob(job)
が実行されることになります.
そして,queueJob
の実装を思い出して欲しいのですが,
37 if (!(job.flags! & SchedulerJobFlags.QUEUED)) {
すでにその job が追加されている場合は無視されるようになっています.
この関数の最後に queueFlush
を実行しているので,毎回空っぽになりそうな気はしますが,実はこれは Promise で繋がっているため,この時点ではまだ flush
されておらず,queue
には job
が残っている状態になります.
これにより,イベントループを媒介した job の重複排除を実現し,不要な実行を抑制することができます.
実際に,考えてみて欲しいのですが,
count.value = 1;
count.value = 2;
と書いたところで,画面的には,2 回目の
setText(n0, 2);
だけの実行で問題ないはずです.
これであらかたスケジューラの理解はできたはずです.
余計な作用の実行を制御するために Promise と queue
を活用し,ライフサイクルフックの実行で画面の更新などを待った後で実行するのが正しいものは pendingPostFlushCbs
という別のキューを用意し,実行タイミングを制御しています.