Scheduler
So far, the term "scheduler" has come up several times.
In this page, we will take a closer look at that scheduler.
What is a Scheduler
A scheduler is something that schedules and executes tasks.
Sometimes it's about adjusting execution timing, sometimes it's about queuing.
Operating systems also have schedulers that schedule processes.
Vue.js also has mechanisms to schedule various actions.
This concept originates not only from vuejs/core-vapor (runtime-vapor) but also from vuejs/core (runtime-core).
For example, the well-known nextTick
is an API of this scheduler.
https://vuejs.org/api/general.html#nexttick
Additionally, the flush
option that can be set as an option in watchers like watch
and watchEffect
is also related to scheduling execution.
https://vuejs.org/api/reactivity-core.html#watch
44export interface WatchEffectOptions extends DebuggerOptions {
45 flush?: 'pre' | 'post' | 'sync'
46}
Overview of the Scheduler API
Before looking into detailed implementations, let's see how the scheduler is actually used.
This is about how Vue.js internally uses it, not an API that Vue.js users directly use.
The implementation of the scheduler is in packages/runtime-vapor/src/scheduler.ts.
First, the basic structure includes a queue
and job
.
And there are two types of queues.queue
and pendingPostFlushCbs
.
22const queue: SchedulerJob[] = []
23let flushIndex = 0
28const pendingPostFlushCbs: SchedulerJob[] = []
29let activePostFlushCbs: SchedulerJob[] | null = null
30let postFlushIndex = 0
Here, it manages the queued jobs and the current execution index.
job
is the actual execution target.
It's a function with id
and flag
(to be discussed later) attached.
23export interface SchedulerJob extends Function {
24 id?: number
25 /**
26 * flags can technically be undefined, but it can still be used in bitwise
27 * operations just like 0.
28 */
29 flags?: SchedulerJobFlags
30}
Next, about the functions that manipulate these.
There are queueJob
which adds a job
to the queue
, queueFlush
and flushJobs
which execute the jobs
in the queue
.
(flushJobs
is called from queueFlush
.)
Then, in flushJobs
, after executing the jobs in the queue, it also executes the jobs in pendingPostFlushCbs
.
35export function queueJob(job: SchedulerJob): void {
74function queueFlush() {
110function flushJobs() {
Also, there are queuePostFlushCb
which adds a job
to flushPostFlushCbs
, and flushPostFlushCbs
which executes the jobs
in pendingPostFlushCbs
.
(as mentioned before, flushPostFlushCbs
is also called from flushJobs
.)
57export function queuePostFlushCb(cb: SchedulerJobs): void {
81export function flushPostFlushCbs(): void {
And the execution of these jobs (flushJobs) is wrapped in a Promise (like Promise.resolve().then(flushJobs)
), and the current job execution (Promise) is managed as currentFlushPromise
.
Then, task scheduling is done by connecting to the then
of this currentFlushPromise
.
And the well-known nextTick
is just a function that registers a callback to the then
of this currentFlushPromise
.
144export function nextTick<T = void, R = void>(
145 this: T,
146 fn?: (this: T) => R,
147): Promise<Awaited<R>> {
148 const p = currentFlushPromise || resolvedPromise
149 return fn ? p.then(this ? fn.bind(this) : fn) : p
150}
Where is it Used?
Let's see where the implementation that manipulates the queue is.
queueJob
Currently, in Vapor, it's used in three places.
161 effect.scheduler = () => queueJob(triggerRenderingUpdate)
65 effect.scheduler = () => {
66 isTriggered = true
67 queueJob(triggerRenderingUpdate)
68 }
41 effect.scheduler = () => queueJob(job)
What's common among these is that they are set in effect.scheduler
.
Let's read a bit ahead about what these are.
queueFlush
Contrary to queueJob
, queueFlush
is only handled internally in the scheduler implementation.
When these are executed depends on when we look at the implementation details.
queuePostFlushCb
This is used in several places.
165 invokeLifecycle(
166 instance,
167 VaporLifecycleHooks.UNMOUNTED,
168 'unmounted',
169 instance => queuePostFlushCb(() => (instance.isUnmounted = true)),
170 true,
171 )
16 function invokeCurrent() {
17 cb && cb(instance)
18 const hooks = instance[lifecycle]
19 if (hooks) {
20 const fn = () => {
21 const reset = setCurrentInstance(instance)
22 instance.scope.run(() => invokeArrayFns(hooks))
23 reset()
24 }
25 post ? queuePostFlushCb(fn) : fn()
26 }
27
28 invokeDirectiveHook(instance, directive, instance.scope)
29 }
251 queuePostFlushCb(() => {
252 instance.isUpdating = false
253 const reset = setCurrentInstance(instance)
254 if (dirs) {
255 invokeDirectiveHook(instance, 'updated', scope)
256 }
257 // updated hook
258 if (u) {
259 queuePostFlushCb(u)
260 }
261 reset()
262 })
137 if (!dirs) {
138 const res = handler && handler()
139 if (name === 'mount') {
140 queuePostFlushCb(() => (scope.im = true))
141 }
142 return res
143 }
144
145 invokeDirectiveHook(instance, before, scope)
146 try {
147 if (handler) {
148 return handler()
149 }
150 } finally {
151 queuePostFlushCb(() => {
152 invokeDirectiveHook(instance, after, scope)
153 })
154 }
74 queuePostFlushCb(() => {
75 instance.isUpdating = false
76 const reset = setCurrentInstance(instance)
77 if (dirs) {
78 invokeDirectiveHook(instance, 'updated', scope)
79 }
80 // updated hook
81 if (u) {
82 queuePostFlushCb(u)
83 }
84 reset()
85 })
What is common among the above five is that they are some kind of lifecycle hooks.
It seems that they add the execution of the callback functions registered in those hooks to pendingPostFlushCbs
.
Lifecycle hooks like updated
, mounted
, unmounted
, etc., might not have the DOM updated yet if executed immediately.
By controlling the execution timing via the scheduler and Promises (event loop), it seems to manage the execution timing.
We will read more about the implementation details together later.
29export function on(
30 el: Element,
31 event: string,
32 handlerGetter: () => undefined | ((...args: any[]) => any),
33 options: AddEventListenerOptions &
34 ModifierOptions & { effect?: boolean } = {},
35): void {
36 const handler: DelegatedHandler = eventHandler(handlerGetter, options)
37 let cleanupEvent: (() => void) | undefined
38 queuePostFlushCb(() => {
39 cleanupEvent = addEventListener(el, event, handler, options)
40 })
117 queuePostFlushCb(doSet)
118
119 onScopeDispose(() => {
120 queuePostFlushCb(() => {
121 if (isArray(existing)) {
122 remove(existing, refValue)
123 } else if (_isString) {
124 refs[ref] = null
125 if (hasOwn(setupState, ref)) {
126 setupState[ref] = null
127 }
128 } else if (_isRef) {
129 ref.value = null
130 }
131 })
132 })
As for the above two, since event
and templateRef
haven't been introduced yet, let's skip them for now.
flushPostFlushCbs
This mainly appears in apiRender.ts
. It also appeared in the t runtime explanation of this book.
It seems to flush after mounting the component.
106export function render(
107 instance: ComponentInternalInstance,
108 container: string | ParentNode,
109): void {
110 mountComponent(instance, (container = normalizeContainer(container)))
111 flushPostFlushCbs()
112}
Similarly, during unmounting.
155export function unmountComponent(instance: ComponentInternalInstance): void {
156 const { container, block, scope } = instance
157
158 // hook: beforeUnmount
159 invokeLifecycle(instance, VaporLifecycleHooks.BEFORE_UNMOUNT, 'beforeUnmount')
160
161 scope.stop()
162 block && remove(block, container)
163
164 // hook: unmounted
165 invokeLifecycle(
166 instance,
167 VaporLifecycleHooks.UNMOUNTED,
168 'unmounted',
169 instance => queuePostFlushCb(() => (instance.isUnmounted = true)),
170 true,
171 )
172 flushPostFlushCbs()
173}
Implementation Details
Now, let's look at the implementations of these four functions.
queueJob
First, queueJob
.
35export function queueJob(job: SchedulerJob): void {
36 let lastOne: SchedulerJob | undefined
37 if (!(job.flags! & SchedulerJobFlags.QUEUED)) {
38 if (job.id == null) {
39 queue.push(job)
40 } else if (
41 // fast path when the job id is larger than the tail
42 !(job.flags! & SchedulerJobFlags.PRE) &&
43 job.id >= (((lastOne = queue[queue.length - 1]) && lastOne.id) || 0)
44 ) {
45 queue.push(job)
46 } else {
47 queue.splice(findInsertionIndex(job.id), 0, job)
48 }
49
50 if (!(job.flags! & SchedulerJobFlags.ALLOW_RECURSE)) {
51 job.flags! |= SchedulerJobFlags.QUEUED
52 }
53 queueFlush()
54 }
55}
It checks the flags
of the job
passed as an argument to determine whether it has already been added to the queue.
If it has, it ignores it.
Then, if the job
does not have an id
set, it adds it to the queue unconditionally.
Because it's impossible to control deduplication and the like (since it can't be identified).
After that, if flags
are not PRE
, it adds it to the end; otherwise, it inserts it at the appropriate index.
That index is found based on id
using findInsertionIndex
.
152// #2768
153// Use binary-search to find a suitable position in the queue,
154// so that the queue maintains the increasing order of job's id,
155// which can prevent the job from being skipped and also can avoid repeated patching.
156function findInsertionIndex(id: number) {
157 // the start index should be `flushIndex + 1`
158 let start = flushIndex + 1
159 let end = queue.length
160
161 while (start < end) {
162 const middle = (start + end) >>> 1
163 const middleJob = queue[middle]
164 const middleJobId = getId(middleJob)
165 if (
166 middleJobId < id ||
167 (middleJobId === id && middleJob.flags! & SchedulerJobFlags.PRE)
168 ) {
169 start = middle + 1
170 } else {
171 end = middle
172 }
173 }
174
175 return start
176}
Since the queue is specified to maintain the order of increasing id
, it uses binary search to quickly determine the position.
Once that's done, it sets flags
to QUEUED
and finishes.
The key point here is that it finally calls queueFlush()
.
Next, let's look at queueFlush
.
queueFlush -> flushJobs
74function queueFlush() {
75 if (!isFlushing && !isFlushPending) {
76 isFlushPending = true
77 currentFlushPromise = resolvedPromise.then(flushJobs)
78 }
79}
queueFlush
simply calls resolvedPromise.then(flushJobs)
.
At this point, flushJobs
is wrapped with resolvedPromise.then
, and that Promise is set to currentFlushPromise
.
Let's take a look at flushJobs
.
110function flushJobs() {
111 isFlushPending = false
112 isFlushing = true
113
114 // Sort queue before flush.
115 // This ensures that:
116 // 1. Components are updated from parent to child. (because parent is always
117 // created before the child so its render effect will have smaller
118 // priority number)
119 // 2. If a component is unmounted during a parent component's update,
120 // its update can be skipped.
121 queue.sort(comparator)
122
123 try {
124 for (let i = 0; i < queue!.length; i++) {
125 queue[i]()
126 queue[i].flags! &= ~SchedulerJobFlags.QUEUED
127 }
128 } finally {
129 flushIndex = 0
130 queue.length = 0
131
132 flushPostFlushCbs()
133
134 isFlushing = false
135 currentFlushPromise = null
136 // some postFlushCb queued jobs!
137 // keep flushing until it drains.
138 if (queue.length || pendingPostFlushCbs.length) {
139 flushJobs()
140 }
141 }
142}
First, the queue is sorted by id
.
121 queue.sort(comparator)
181const comparator = (a: SchedulerJob, b: SchedulerJob): number => {
182 const diff = getId(a) - getId(b)
183 if (diff === 0) {
184 const isAPre = a.flags! & SchedulerJobFlags.PRE
185 const isBPre = b.flags! & SchedulerJobFlags.PRE
186 if (isAPre && !isBPre) return -1
187 if (isBPre && !isAPre) return 1
188 }
189 return diff
190}
Then, they are executed in order.
123 try {
124 for (let i = 0; i < queue!.length; i++) {
125 queue[i]()
126 queue[i].flags! &= ~SchedulerJobFlags.QUEUED
127 }
In the finally
block, it also executes flushPostFlushCbs
, and finally, it checks queue
and pendingPostFlushCbs
again; if there are still jobs remaining, it recursively calls flushJobs
again.
128 } finally {
129 flushIndex = 0
130 queue.length = 0
131
132 flushPostFlushCbs()
133
134 isFlushing = false
135 currentFlushPromise = null
136 // some postFlushCb queued jobs!
137 // keep flushing until it drains.
138 if (queue.length || pendingPostFlushCbs.length) {
139 flushJobs()
140 }
queuePostFlushCb
Similarly, the target is pendingPostFlushCbs
, and the basic flow is the same as queueJob
.
57export function queuePostFlushCb(cb: SchedulerJobs): void {
58 if (!isArray(cb)) {
59 if (!(cb.flags! & SchedulerJobFlags.QUEUED)) {
60 pendingPostFlushCbs.push(cb)
61 if (!(cb.flags! & SchedulerJobFlags.ALLOW_RECURSE)) {
62 cb.flags! |= SchedulerJobFlags.QUEUED
63 }
64 }
65 } else {
66 // if cb is an array, it is a component lifecycle hook which can only be
67 // triggered by a job, which is already deduped in the main queue, so
68 // we can skip duplicate check here to improve perf
69 pendingPostFlushCbs.push(...cb)
70 }
71 queueFlush()
72}
For flushing after queuing, just remember it's queueFlush
. (queue
is also consumed)
71 queueFlush()
flushPostFlushCbs
81export function flushPostFlushCbs(): void {
82 if (!pendingPostFlushCbs.length) return
83
84 const deduped = [...new Set(pendingPostFlushCbs)]
85 pendingPostFlushCbs.length = 0
86
87 // #1947 already has active queue, nested flushPostFlushCbs call
88 if (activePostFlushCbs) {
89 activePostFlushCbs.push(...deduped)
90 return
91 }
92
93 activePostFlushCbs = deduped
94
95 activePostFlushCbs.sort((a, b) => getId(a) - getId(b))
96
97 for (
98 postFlushIndex = 0;
99 postFlushIndex < activePostFlushCbs.length;
100 postFlushIndex++
101 ) {
102 activePostFlushCbs[postFlushIndex]()
103 activePostFlushCbs[postFlushIndex].flags! &= ~SchedulerJobFlags.QUEUED
104 }
105 activePostFlushCbs = null
106 postFlushIndex = 0
107}
Also, it removes duplicates using new Set
, sorts by id
, and executes pendingPostFlushCbs
in order.
ReactiveEffect and Scheduler
Now, there is one more aspect of the scheduler that we need to understand.
What is common among them is that they are set in
effect.scheduler
.
This is the part.
The scheduler
option that the effect has is used to wrap the processing in the form of queueJob
.
So, what exactly is this effect.scheduler
?
effect
is an instance of ReactiveEffect
.
113export class ReactiveEffect<T = any>
It receives the function (fn) that you want to execute and creates an instance.
142 constructor(public fn: () => T) {
And there are two methods to execute a ReactiveEffect
:run
and trigger
.
182 run(): T {
226 trigger(): void {
The run
method can be executed at any desired time, such as:
const effect = new ReactiveEffect(() => console.log("effect"));
effect.run();
It is also executed via this run
during the initial execution of renderEffect
.
37 const effect = new ReactiveEffect(() =>
38 callWithAsyncErrorHandling(cb, instance, VaporErrorCodes.RENDER_FUNCTION),
39 )
40
41 effect.scheduler = () => queueJob(job)
42 if (__DEV__ && instance) {
43 effect.onTrack = instance.rtc
44 ? e => invokeArrayFns(instance.rtc!, e)
45 : void 0
46 effect.onTrigger = instance.rtg
47 ? e => invokeArrayFns(instance.rtg!, e)
48 : void 0
49 }
50 effect.run()
On the other hand, trigger
is primarily used when reactivity is established.
For example, when the set
method of a ref object is called.
134 set value(newValue) {
↓
153 this.dep.trigger()
↓
118 trigger(debugInfo?: DebuggerEventExtraInfo): void {
119 this.version++
120 globalVersion++
121 this.notify(debugInfo)
122 }
↓
148 for (let link = this.subs; link; link = link.prevSub) {
149 link.sub.notify()
150 }
↓
173 return this.trigger()
When looking at the trigger
function,
226 trigger(): void {
227 if (this.flags & EffectFlags.PAUSED) {
228 pausedQueueEffects.add(this)
229 } else if (this.scheduler) {
230 this.scheduler()
231 } else {
232 this.runIfDirty()
233 }
234 }
if it has a scheduler
, it is given priority to execute.
This mechanism ensures that unnecessary executions do not occur when reactive effects are triggered based on certain dependencies.
The scheduler
property allows you to appropriately set up the processing to be queued by the scheduler, optimizing the execution of effects.
For example, let's look at the implementation of renderEffect
.
It sets () => queueJob(job)
as the scheduler
.
37 const effect = new ReactiveEffect(() =>
38 callWithAsyncErrorHandling(cb, instance, VaporErrorCodes.RENDER_FUNCTION),
39 )
40
41 effect.scheduler = () => queueJob(job)
And suppose renderEffect
is called as follows.
const t0 = template("<p></p>");
const n0 = t0();
const count = ref(0);
const effect = () => setText(n0, count.value);
renderEffect(effect);
This way, the effect
(a job that wraps effect
) is tracked by count
, and when count
changes, it triggers that job.
When trigger
is called, the scheduler
property set internally is executed, but in this case, it is set to "add the job to the queue" rather than "execute the job", so it is not executed immediately but passed to the scheduler.
Now, let's consider such a trigger.
const count = ref(0);
const effect = () => setText(n0, count.value);
renderEffect(effect);
count.value = 1; // enqueue job
count.value = 2; // enqueue job
Doing so will execute () => queueJob(job)
twice.
And recalling the implementation of queueJob
,
37 if (!(job.flags! & SchedulerJobFlags.QUEUED)) {
if the job is already added, it will be ignored.
Since this function executes queueFlush
at the end, you might think the queue would be emptied each time, but actually, because it is connected via a Promise, the flush
has not yet occurred at this point, and the job
remains in the queue.
This achieves deduplication of jobs mediated by the event loop, preventing unnecessary executions.
In fact, consider the following:
count.value = 1;
count.value = 2;
Even though written like this, visually, only the second
setText(n0, 2);
should be executed, which is fine.
With this, you should have a general understanding of the scheduler.
To control the execution of unnecessary effects, Promises and the queue
are utilized, and to properly execute after waiting for updates to the screen and other actions through lifecycle hooks, a separate queue called pendingPostFlushCbs
is prepared to control the execution timing.