Using Schedulers
A scheduler controls when a subscription starts and when notifications are published. It consists of three components. It is first a data structure. When you schedule for tasks to be completed, they are put into the scheduler for queueing based on priority or other criteria. It also offers an execution context which denotes where the task is executed (e.g., in the immediately, current thread, or in another callback mechanism such as setTimeout
or process.nextTick
). Lastly, it has a clock which provides a notion of time for itself (by accessing the now
method of a scheduler). Tasks being scheduled on a particular scheduler will adhere to the time denoted by that clock only.
Schedulers also introduce the notion of virtual time (denoted by the VirtualTimeScheduler
type), which does not correlate with real time that is used in our daily life. For example, a sequence that is specified to take 100 years to complete can be scheduled to complete in virtual time in a mere 5 minutes. This will be covered in the Testing and Debugging Observable Sequences topic.
Scheduler Types
The various Scheduler types provided by Rx all implement the Scheduler
methods. Each of these can be created and returned by using static properties of the Scheduler
object. The ImmediateScheduler
(by accessing the static immediate
property) will start the specified action immediately. The CurrentThreadScheduler
(by accessing the static currentThread
property) will schedule actions to be performed on the thread that makes the original call. The action is not executed immediately, but is placed in a queue and only executed after the current action is complete. The DefaultScheduler
(by accessing the static default
property) will schedule actions to be performed on a asynchronous callback, which is optimized for the particular runtime, such as setImmediate
or process.nextTick
on Node.js or in the browser with a fallback to setTimeout
.
Using Schedulers
You may have already used schedulers in your Rx code without explicitly stating the type of schedulers to be used. This is because all Observable operators that deal with concurrency have optional schedulers. If you do not provide the scheduler, RxJS will pick a default scheduler by using the principle of least concurrency. This means that the scheduler which introduces the least amount of concurrency that satisfies the needs of the operator is chosen. For example, for operators returning an observable with a finite and small number of messages, RxJS calls immediate
. For operators returning a potentially large or infinite number of messages, currentThread
is called. For operators which use timers, default
is used.
Because RxJS uses the least concurrency scheduler, you can pick a different scheduler if you want to introduce concurrency for performance purpose. To specify a particular scheduler, you can use those operator methods that take a scheduler, e.g., return(42, Rx.Scheduler.default)
.
In the following example, the source observable sequence is producing values at a frantic pace. The default scheduler of the generate operator would place onNext messages on the currentThread
.
var obs = Rx.Observable.generate(
0,
() => true,
x => x + 1,
x => x);
This will queue up on the observer quickly. We can improve this code by using the observeOn operator, which allows you to specify the context that you want to use to send pushed notifications (onNext) to observers. By default, the observeOn
operator ensures that onNext
will be called as many times as possible on the current thread. You can use its overloads and redirect the onNext
outputs to a different context. In addition, you can use the subscribeOn
operator to return a proxy observable that delegates actions to a specific scheduler. For example, for a UI-intensive application, you can delegate all background operations to be performed on a scheduler running in the background by using subscribeOn
and passing to it the DefaultScheduler
.
The following example will schedule any onNext notifications on the current Dispatcher, so that any value pushed out is sent on the UI thread. This is especially beneficial to Silverlight developers who use RxJS.
Rx.Observable.generate(
0,
() => true,
x => x + 1,
x => x
)
.observeOn(Rx.Scheduler.default)
.subscribe(...);
Instead of using the observeOn
operator to change the execution context on which the observable sequence produces messages, we can create concurrency in the right place to begin with. As operators parameterize introduction of concurrency by providing a scheduler argument overload, passing the right scheduler will lead to fewer places where the ObserveOn operator has to be used. For example, we can unblock the observer and subscribe to the UI thread directly by changing the scheduler used by the source, as in the following example. In this code, by using the generate
method passing a scheduler, and providing the Rx.Scheduler.default
instance, all values pushed out from this observable sequence will originate via an asynchronous callback.
Rx.Observable.generate(
0,
() => true,
x => x + 1,
x => x,
Rx.Scheduler.default)
.subscribe(...);
You should also note that by using the observeOn
operator, an action is scheduled for each message that comes through the original observable sequence. This potentially changes timing information as well as puts additional stress on the system. If you have a query that composes various observable sequences running on many different execution contexts, and you are doing filtering in the query, it is best to place observeOn
later in the query. This is because a query will potentially filter out a lot of messages, and placing the observeOn
operator earlier in the query would do extra work on messages that would be filtered out anyway. Calling the observeOn
operator at the end of the query will create the least performance impact.
Another advantage of specifying a scheduler type explicitly is that you can introduce concurrency for performance purpose, as illustrated by the following code.
seq.groupBy(...)
.map(x => x.observeOn(Rx.Scheduler.default))
.map(x => expensive(x)) // perform operations that are expensive on resources
When to Use Which Scheduler
To make things a little easier when you are creating your own operators, or using the standard built-in ones, which scheduler you should use. The following table lays out each scenario with the suggested scheduler.
Scenario | Scheduler |
---|---|
Constant Time Operations | Rx.Scheduler.immediate |
Tail Recursive Operations | Rx.Scheduler.immediate |
Iteration Operations | Rx.Scheduler.currentThread |
Time-based Operations | Rx.Scheduler.default |
Asynchronous Conversions | Rx.Scheduler.default |
Historical Data Operations | Rx.HistoricalScheduler |
Unit Testing | Rx.TestScheduler |
相关阅读
Reference