Skip to content

Scheduler

So far, the term "scheduler" has come up several times.
In this page, we will take a closer look at that scheduler.

What is a Scheduler

A scheduler is something that schedules and executes tasks.
Sometimes it's about adjusting execution timing, sometimes it's about queuing.

Operating systems also have schedulers that schedule processes.

Vue.js also has mechanisms to schedule various actions.
This concept originates not only from vuejs/core-vapor (runtime-vapor) but also from vuejs/core (runtime-core).
For example, the well-known nextTick is an API of this scheduler.

https://vuejs.org/api/general.html#nexttick

Additionally, the flush option that can be set as an option in watchers like watch and watchEffect is also related to scheduling execution.

https://vuejs.org/api/reactivity-core.html#watch

44export interface WatchEffectOptions extends DebuggerOptions {
45  flush?: 'pre' | 'post' | 'sync'
46}

Overview of the Scheduler API

Before looking into detailed implementations, let's see how the scheduler is actually used.
This is about how Vue.js internally uses it, not an API that Vue.js users directly use.

The implementation of the scheduler is in packages/runtime-vapor/src/scheduler.ts.

First, the basic structure includes a queue and job.
And there are two types of queues.
queue and pendingPostFlushCbs.

22const queue: SchedulerJob[] = []
23let flushIndex = 0
28const pendingPostFlushCbs: SchedulerJob[] = []
29let activePostFlushCbs: SchedulerJob[] | null = null
30let postFlushIndex = 0

Here, it manages the queued jobs and the current execution index.

job is the actual execution target.
It's a function with id and flag (to be discussed later) attached.

23export interface SchedulerJob extends Function {
24  id?: number
25  /**
26   * flags can technically be undefined, but it can still be used in bitwise
27   * operations just like 0.
28   */
29  flags?: SchedulerJobFlags
30}

Next, about the functions that manipulate these.

There are queueJob which adds a job to the queue, queueFlush and flushJobs which execute the jobs in the queue.
(flushJobs is called from queueFlush.)
Then, in flushJobs, after executing the jobs in the queue, it also executes the jobs in pendingPostFlushCbs.

35export function queueJob(job: SchedulerJob): void {
110function flushJobs() {

Also, there are queuePostFlushCb which adds a job to flushPostFlushCbs, and flushPostFlushCbs which executes the jobs in pendingPostFlushCbs.
(as mentioned before, flushPostFlushCbs is also called from flushJobs.)

57export function queuePostFlushCb(cb: SchedulerJobs): void {
81export function flushPostFlushCbs(): void {

And the execution of these jobs (flushJobs) is wrapped in a Promise (like Promise.resolve().then(flushJobs)), and the current job execution (Promise) is managed as currentFlushPromise.
Then, task scheduling is done by connecting to the then of this currentFlushPromise.

And the well-known nextTick is just a function that registers a callback to the then of this currentFlushPromise.

144export function nextTick<T = void, R = void>(
145  this: T,
146  fn?: (this: T) => R,
147): Promise<Awaited<R>> {
148  const p = currentFlushPromise || resolvedPromise
149  return fn ? p.then(this ? fn.bind(this) : fn) : p
150}

Where is it Used?

Let's see where the implementation that manipulates the queue is.

queueJob

Currently, in Vapor, it's used in three places.

161      effect.scheduler = () => queueJob(triggerRenderingUpdate)
65  effect.scheduler = () => {
66    isTriggered = true
67    queueJob(triggerRenderingUpdate)
68  }
41  effect.scheduler = () => queueJob(job)

What's common among these is that they are set in effect.scheduler.
Let's read a bit ahead about what these are.

queueFlush

Contrary to queueJob, queueFlush is only handled internally in the scheduler implementation.
When these are executed depends on when we look at the implementation details.

queuePostFlushCb

This is used in several places.

165  invokeLifecycle(
166    instance,
167    VaporLifecycleHooks.UNMOUNTED,
168    'unmounted',
169    instance => queuePostFlushCb(() => (instance.isUnmounted = true)),
170    true,
171  )
16  function invokeCurrent() {
17    cb && cb(instance)
18    const hooks = instance[lifecycle]
19    if (hooks) {
20      const fn = () => {
21        const reset = setCurrentInstance(instance)
22        instance.scope.run(() => invokeArrayFns(hooks))
23        reset()
24      }
25      post ? queuePostFlushCb(fn) : fn()
26    }
27
28    invokeDirectiveHook(instance, directive, instance.scope)
29  }
251      queuePostFlushCb(() => {
252        instance.isUpdating = false
253        const reset = setCurrentInstance(instance)
254        if (dirs) {
255          invokeDirectiveHook(instance, 'updated', scope)
256        }
257        // updated hook
258        if (u) {
259          queuePostFlushCb(u)
260        }
261        reset()
262      })
137  if (!dirs) {
138    const res = handler && handler()
139    if (name === 'mount') {
140      queuePostFlushCb(() => (scope.im = true))
141    }
142    return res
143  }
144
145  invokeDirectiveHook(instance, before, scope)
146  try {
147    if (handler) {
148      return handler()
149    }
150  } finally {
151    queuePostFlushCb(() => {
152      invokeDirectiveHook(instance, after, scope)
153    })
154  }
74      queuePostFlushCb(() => {
75        instance.isUpdating = false
76        const reset = setCurrentInstance(instance)
77        if (dirs) {
78          invokeDirectiveHook(instance, 'updated', scope)
79        }
80        // updated hook
81        if (u) {
82          queuePostFlushCb(u)
83        }
84        reset()
85      })

What is common among the above five is that they are some kind of lifecycle hooks.
It seems that they add the execution of the callback functions registered in those hooks to pendingPostFlushCbs.

Lifecycle hooks like updated, mounted, unmounted, etc., might not have the DOM updated yet if executed immediately.
By controlling the execution timing via the scheduler and Promises (event loop), it seems to manage the execution timing.
We will read more about the implementation details together later.

29export function on(
30  el: Element,
31  event: string,
32  handlerGetter: () => undefined | ((...args: any[]) => any),
33  options: AddEventListenerOptions &
34    ModifierOptions & { effect?: boolean } = {},
35): void {
36  const handler: DelegatedHandler = eventHandler(handlerGetter, options)
37  let cleanupEvent: (() => void) | undefined
38  queuePostFlushCb(() => {
39    cleanupEvent = addEventListener(el, event, handler, options)
40  })
117      queuePostFlushCb(doSet)
118
119      onScopeDispose(() => {
120        queuePostFlushCb(() => {
121          if (isArray(existing)) {
122            remove(existing, refValue)
123          } else if (_isString) {
124            refs[ref] = null
125            if (hasOwn(setupState, ref)) {
126              setupState[ref] = null
127            }
128          } else if (_isRef) {
129            ref.value = null
130          }
131        })
132      })

As for the above two, since event and templateRef haven't been introduced yet, let's skip them for now.

flushPostFlushCbs

This mainly appears in apiRender.ts. It also appeared in the t runtime explanation of this book.

It seems to flush after mounting the component.

106export function render(
107  instance: ComponentInternalInstance,
108  container: string | ParentNode,
109): void {
110  mountComponent(instance, (container = normalizeContainer(container)))
111  flushPostFlushCbs()
112}

Similarly, during unmounting.

155export function unmountComponent(instance: ComponentInternalInstance): void {
156  const { container, block, scope } = instance
157
158  // hook: beforeUnmount
159  invokeLifecycle(instance, VaporLifecycleHooks.BEFORE_UNMOUNT, 'beforeUnmount')
160
161  scope.stop()
162  block && remove(block, container)
163
164  // hook: unmounted
165  invokeLifecycle(
166    instance,
167    VaporLifecycleHooks.UNMOUNTED,
168    'unmounted',
169    instance => queuePostFlushCb(() => (instance.isUnmounted = true)),
170    true,
171  )
172  flushPostFlushCbs()
173}

Implementation Details

Now, let's look at the implementations of these four functions.

queueJob

First, queueJob.

35export function queueJob(job: SchedulerJob): void {
36  let lastOne: SchedulerJob | undefined
37  if (!(job.flags! & SchedulerJobFlags.QUEUED)) {
38    if (job.id == null) {
39      queue.push(job)
40    } else if (
41      // fast path when the job id is larger than the tail
42      !(job.flags! & SchedulerJobFlags.PRE) &&
43      job.id >= (((lastOne = queue[queue.length - 1]) && lastOne.id) || 0)
44    ) {
45      queue.push(job)
46    } else {
47      queue.splice(findInsertionIndex(job.id), 0, job)
48    }
49
50    if (!(job.flags! & SchedulerJobFlags.ALLOW_RECURSE)) {
51      job.flags! |= SchedulerJobFlags.QUEUED
52    }
53    queueFlush()
54  }
55}

It checks the flags of the job passed as an argument to determine whether it has already been added to the queue.
If it has, it ignores it.

Then, if the job does not have an id set, it adds it to the queue unconditionally.
Because it's impossible to control deduplication and the like (since it can't be identified).

After that, if flags are not PRE, it adds it to the end; otherwise, it inserts it at the appropriate index.
That index is found based on id using findInsertionIndex.

152// #2768
153// Use binary-search to find a suitable position in the queue,
154// so that the queue maintains the increasing order of job's id,
155// which can prevent the job from being skipped and also can avoid repeated patching.
156function findInsertionIndex(id: number) {
157  // the start index should be `flushIndex + 1`
158  let start = flushIndex + 1
159  let end = queue.length
160
161  while (start < end) {
162    const middle = (start + end) >>> 1
163    const middleJob = queue[middle]
164    const middleJobId = getId(middleJob)
165    if (
166      middleJobId < id ||
167      (middleJobId === id && middleJob.flags! & SchedulerJobFlags.PRE)
168    ) {
169      start = middle + 1
170    } else {
171      end = middle
172    }
173  }
174
175  return start
176}

Since the queue is specified to maintain the order of increasing id, it uses binary search to quickly determine the position.

Once that's done, it sets flags to QUEUED and finishes.
The key point here is that it finally calls queueFlush().

Next, let's look at queueFlush.

queueFlush -> flushJobs

74function queueFlush() {
75  if (!isFlushing && !isFlushPending) {
76    isFlushPending = true
77    currentFlushPromise = resolvedPromise.then(flushJobs)
78  }
79}

queueFlush simply calls resolvedPromise.then(flushJobs).
At this point, flushJobs is wrapped with resolvedPromise.then, and that Promise is set to currentFlushPromise.

Let's take a look at flushJobs.

110function flushJobs() {
111  isFlushPending = false
112  isFlushing = true
113
114  // Sort queue before flush.
115  // This ensures that:
116  // 1. Components are updated from parent to child. (because parent is always
117  //    created before the child so its render effect will have smaller
118  //    priority number)
119  // 2. If a component is unmounted during a parent component's update,
120  //    its update can be skipped.
121  queue.sort(comparator)
122
123  try {
124    for (let i = 0; i < queue!.length; i++) {
125      queue[i]()
126      queue[i].flags! &= ~SchedulerJobFlags.QUEUED
127    }
128  } finally {
129    flushIndex = 0
130    queue.length = 0
131
132    flushPostFlushCbs()
133
134    isFlushing = false
135    currentFlushPromise = null
136    // some postFlushCb queued jobs!
137    // keep flushing until it drains.
138    if (queue.length || pendingPostFlushCbs.length) {
139      flushJobs()
140    }
141  }
142}

First, the queue is sorted by id.

121  queue.sort(comparator)
181const comparator = (a: SchedulerJob, b: SchedulerJob): number => {
182  const diff = getId(a) - getId(b)
183  if (diff === 0) {
184    const isAPre = a.flags! & SchedulerJobFlags.PRE
185    const isBPre = b.flags! & SchedulerJobFlags.PRE
186    if (isAPre && !isBPre) return -1
187    if (isBPre && !isAPre) return 1
188  }
189  return diff
190}

Then, they are executed in order.

123  try {
124    for (let i = 0; i < queue!.length; i++) {
125      queue[i]()
126      queue[i].flags! &= ~SchedulerJobFlags.QUEUED
127    }

In the finally block, it also executes flushPostFlushCbs, and finally, it checks queue and pendingPostFlushCbs again; if there are still jobs remaining, it recursively calls flushJobs again.

128  } finally {
129    flushIndex = 0
130    queue.length = 0
131
132    flushPostFlushCbs()
133
134    isFlushing = false
135    currentFlushPromise = null
136    // some postFlushCb queued jobs!
137    // keep flushing until it drains.
138    if (queue.length || pendingPostFlushCbs.length) {
139      flushJobs()
140    }

queuePostFlushCb

Similarly, the target is pendingPostFlushCbs, and the basic flow is the same as queueJob.

57export function queuePostFlushCb(cb: SchedulerJobs): void {
58  if (!isArray(cb)) {
59    if (!(cb.flags! & SchedulerJobFlags.QUEUED)) {
60      pendingPostFlushCbs.push(cb)
61      if (!(cb.flags! & SchedulerJobFlags.ALLOW_RECURSE)) {
62        cb.flags! |= SchedulerJobFlags.QUEUED
63      }
64    }
65  } else {
66    // if cb is an array, it is a component lifecycle hook which can only be
67    // triggered by a job, which is already deduped in the main queue, so
68    // we can skip duplicate check here to improve perf
69    pendingPostFlushCbs.push(...cb)
70  }
71  queueFlush()
72}

For flushing after queuing, just remember it's queueFlush. (queue is also consumed)

flushPostFlushCbs

81export function flushPostFlushCbs(): void {
82  if (!pendingPostFlushCbs.length) return
83
84  const deduped = [...new Set(pendingPostFlushCbs)]
85  pendingPostFlushCbs.length = 0
86
87  // #1947 already has active queue, nested flushPostFlushCbs call
88  if (activePostFlushCbs) {
89    activePostFlushCbs.push(...deduped)
90    return
91  }
92
93  activePostFlushCbs = deduped
94
95  activePostFlushCbs.sort((a, b) => getId(a) - getId(b))
96
97  for (
98    postFlushIndex = 0;
99    postFlushIndex < activePostFlushCbs.length;
100    postFlushIndex++
101  ) {
102    activePostFlushCbs[postFlushIndex]()
103    activePostFlushCbs[postFlushIndex].flags! &= ~SchedulerJobFlags.QUEUED
104  }
105  activePostFlushCbs = null
106  postFlushIndex = 0
107}

Also, it removes duplicates using new Set, sorts by id, and executes pendingPostFlushCbs in order.

ReactiveEffect and Scheduler

Now, there is one more aspect of the scheduler that we need to understand.

What is common among them is that they are set in effect.scheduler.

This is the part.

The scheduler option that the effect has is used to wrap the processing in the form of queueJob.
So, what exactly is this effect.scheduler?

effect is an instance of ReactiveEffect.

113export class ReactiveEffect<T = any>

It receives the function (fn) that you want to execute and creates an instance.

142  constructor(public fn: () => T) {

And there are two methods to execute a ReactiveEffect:
run and trigger.

226  trigger(): void {

The run method can be executed at any desired time, such as:

ts
const effect = new ReactiveEffect(() => console.log("effect"));
effect.run();

It is also executed via this run during the initial execution of renderEffect.

37  const effect = new ReactiveEffect(() =>
38    callWithAsyncErrorHandling(cb, instance, VaporErrorCodes.RENDER_FUNCTION),
39  )
40
41  effect.scheduler = () => queueJob(job)
42  if (__DEV__ && instance) {
43    effect.onTrack = instance.rtc
44      ? e => invokeArrayFns(instance.rtc!, e)
45      : void 0
46    effect.onTrigger = instance.rtg
47      ? e => invokeArrayFns(instance.rtg!, e)
48      : void 0
49  }
50  effect.run()

On the other hand, trigger is primarily used when reactivity is established.
For example, when the set method of a ref object is called.

134  set value(newValue) {

153        this.dep.trigger()

118  trigger(debugInfo?: DebuggerEventExtraInfo): void {
119    this.version++
120    globalVersion++
121    this.notify(debugInfo)
122  }

148      for (let link = this.subs; link; link = link.prevSub) {
149        link.sub.notify()
150      }

173      return this.trigger()

When looking at the trigger function,

226  trigger(): void {
227    if (this.flags & EffectFlags.PAUSED) {
228      pausedQueueEffects.add(this)
229    } else if (this.scheduler) {
230      this.scheduler()
231    } else {
232      this.runIfDirty()
233    }
234  }

if it has a scheduler, it is given priority to execute.

This mechanism ensures that unnecessary executions do not occur when reactive effects are triggered based on certain dependencies.
The scheduler property allows you to appropriately set up the processing to be queued by the scheduler, optimizing the execution of effects.

For example, let's look at the implementation of renderEffect.
It sets () => queueJob(job) as the scheduler.

37  const effect = new ReactiveEffect(() =>
38    callWithAsyncErrorHandling(cb, instance, VaporErrorCodes.RENDER_FUNCTION),
39  )
40
41  effect.scheduler = () => queueJob(job)

And suppose renderEffect is called as follows.

ts
const t0 = template("<p></p>");

const n0 = t0();
const count = ref(0);
const effect = () => setText(n0, count.value);
renderEffect(effect);

This way, the effect (a job that wraps effect) is tracked by count, and when count changes, it triggers that job.
When trigger is called, the scheduler property set internally is executed, but in this case, it is set to "add the job to the queue" rather than "execute the job", so it is not executed immediately but passed to the scheduler.

Now, let's consider such a trigger.

ts
const count = ref(0);
const effect = () => setText(n0, count.value);
renderEffect(effect);

count.value = 1; // enqueue job
count.value = 2; // enqueue job

Doing so will execute () => queueJob(job) twice.
And recalling the implementation of queueJob,

37  if (!(job.flags! & SchedulerJobFlags.QUEUED)) {

if the job is already added, it will be ignored.
Since this function executes queueFlush at the end, you might think the queue would be emptied each time, but actually, because it is connected via a Promise, the flush has not yet occurred at this point, and the job remains in the queue.

This achieves deduplication of jobs mediated by the event loop, preventing unnecessary executions.
In fact, consider the following:

ts
count.value = 1;
count.value = 2;

Even though written like this, visually, only the second

ts
setText(n0, 2);

should be executed, which is fine.

With this, you should have a general understanding of the scheduler.
To control the execution of unnecessary effects, Promises and the queue are utilized, and to properly execute after waiting for updates to the screen and other actions through lifecycle hooks, a separate queue called pendingPostFlushCbs is prepared to control the execution timing.