Using Schedulers

A scheduler controls when a subscription starts and when notifications are published. It consists of three components. It is first a data structure. When you schedule for tasks to be completed, they are put into the scheduler for queueing based on priority or other criteria. It also offers an execution context which denotes where the task is executed (e.g., in the immediately, current thread, or in another callback mechanism such as setTimeout or process.nextTick). Lastly, it has a clock which provides a notion of time for itself (by accessing the now method of a scheduler). Tasks being scheduled on a particular scheduler will adhere to the time denoted by that clock only.

Schedulers also introduce the notion of virtual time (denoted by the VirtualTimeScheduler type), which does not correlate with real time that is used in our daily life. For example, a sequence that is specified to take 100 years to complete can be scheduled to complete in virtual time in a mere 5 minutes. This will be covered in the Testing and Debugging Observable Sequences topic.

Scheduler Types

The various Scheduler types provided by Rx all implement the Scheduler methods. Each of these can be created and returned by using static properties of the Scheduler object. The ImmediateScheduler (by accessing the static immediate property) will start the specified action immediately. The CurrentThreadScheduler (by accessing the static currentThread property) will schedule actions to be performed on the thread that makes the original call. The action is not executed immediately, but is placed in a queue and only executed after the current action is complete. The DefaultScheduler (by accessing the static default property) will schedule actions to be performed on a asynchronous callback, which is optimized for the particular runtime, such as setImmediate or process.nextTick on Node.js or in the browser with a fallback to setTimeout.

Using Schedulers

You may have already used schedulers in your Rx code without explicitly stating the type of schedulers to be used. This is because all Observable operators that deal with concurrency have optional schedulers. If you do not provide the scheduler, RxJS will pick a default scheduler by using the principle of least concurrency. This means that the scheduler which introduces the least amount of concurrency that satisfies the needs of the operator is chosen. For example, for operators returning an observable with a finite and small number of messages, RxJS calls immediate. For operators returning a potentially large or infinite number of messages, currentThread is called. For operators which use timers, default is used.

Because RxJS uses the least concurrency scheduler, you can pick a different scheduler if you want to introduce concurrency for performance purpose. To specify a particular scheduler, you can use those operator methods that take a scheduler, e.g., return(42, Rx.Scheduler.default).

In the following example, the source observable sequence is producing values at a frantic pace. The default scheduler of the generate operator would place onNext messages on the currentThread.

var obs = Rx.Observable.generate(
    0,
    () => true,
    x => x + 1,
    x => x);

This will queue up on the observer quickly. We can improve this code by using the observeOn operator, which allows you to specify the context that you want to use to send pushed notifications (onNext) to observers. By default, the observeOn operator ensures that onNext will be called as many times as possible on the current thread. You can use its overloads and redirect the onNext outputs to a different context. In addition, you can use the subscribeOn operator to return a proxy observable that delegates actions to a specific scheduler. For example, for a UI-intensive application, you can delegate all background operations to be performed on a scheduler running in the background by using subscribeOn and passing to it the DefaultScheduler.

The following example will schedule any onNext notifications on the current Dispatcher, so that any value pushed out is sent on the UI thread. This is especially beneficial to Silverlight developers who use RxJS.

Rx.Observable.generate(
    0,
    () => true,
    x => x + 1,
    x => x
    )
    .observeOn(Rx.Scheduler.default)
    .subscribe(...);

Instead of using the observeOn operator to change the execution context on which the observable sequence produces messages, we can create concurrency in the right place to begin with. As operators parameterize introduction of concurrency by providing a scheduler argument overload, passing the right scheduler will lead to fewer places where the ObserveOn operator has to be used. For example, we can unblock the observer and subscribe to the UI thread directly by changing the scheduler used by the source, as in the following example. In this code, by using the generate method passing a scheduler, and providing the Rx.Scheduler.default instance, all values pushed out from this observable sequence will originate via an asynchronous callback.

Rx.Observable.generate(
    0,
    () => true,
    x => x + 1,
    x => x,
    Rx.Scheduler.default)
    .subscribe(...);

You should also note that by using the observeOn operator, an action is scheduled for each message that comes through the original observable sequence. This potentially changes timing information as well as puts additional stress on the system. If you have a query that composes various observable sequences running on many different execution contexts, and you are doing filtering in the query, it is best to place observeOn later in the query. This is because a query will potentially filter out a lot of messages, and placing the observeOn operator earlier in the query would do extra work on messages that would be filtered out anyway. Calling the observeOn operator at the end of the query will create the least performance impact.

Another advantage of specifying a scheduler type explicitly is that you can introduce concurrency for performance purpose, as illustrated by the following code.

seq.groupBy(...)
  .map(x => x.observeOn(Rx.Scheduler.default))
  .map(x => expensive(x))  // perform operations that are expensive on resources

When to Use Which Scheduler

To make things a little easier when you are creating your own operators, or using the standard built-in ones, which scheduler you should use. The following table lays out each scenario with the suggested scheduler.

Scenario Scheduler
Constant Time Operations Rx.Scheduler.immediate
Tail Recursive Operations Rx.Scheduler.immediate
Iteration Operations Rx.Scheduler.currentThread
Time-based Operations Rx.Scheduler.default
Asynchronous Conversions Rx.Scheduler.default
Historical Data Operations Rx.HistoricalScheduler
Unit Testing Rx.TestScheduler

相关阅读

Reference