桥接回调

除了事件之外,还有其他的异步数据源存在在web端和服务端。其中之一就是 node.js 中频繁用到的回调模式。这种设计模式中,传递函数作为参数,然后回调通常是最后一个参数,当执行时,通过数据传递将内部作用域的控制权传给回调。Node.js 有标准的方式实现回调,回调被调用的时候如果有错误,将会首先返回 Error 对象,否则返回 null, 然后是回调附加额外的参数。

将回调转换成数据流

Node.js 中的许多异步方法和 JavaScript APIs以这样一种方式编写,它有一个回调作为最后一个参数。这些标准的回调会带着传递给它的数据被调用。我们可以使用 Rx.Observable.fromCallback 去封装这种回调。 注意,这不包括 Node.js 风格回调,Error 作为第一个参数。对于那种操作, 我们提供 Rx.Observable.fromNodeCallback 去包含上面这种情况。

下面的例子,我们将会将 Node.js 的 fs.exists 函数进行转换。 这种函数传入一个 path,然后返回一个 true 或者 false 值来表示文件是否存在,这个例子中,我们会检测 'file.txt' 是否存在。 当被 Rx.Observable.fromCallback 封装返回的参数,将包含传递给回调的参数的数组。

var Rx = require('rx'),
    fs = require('fs');

// 封装 exists 方法
var exists = Rx.Observable.fromCallback(fs.exists);

var source = exists('file.txt');

// 获取第一个参数 true/false
var subscription = source.subscribe(
    x => console.log('onNext: %s', x),
    e => console.log('onError: %s', e),
    () => console.log('onCompleted'));

// => onNext: true
// => onCompleted

将 Node.js 风格的 Callbacks 转换为数据流

Node.js采用了公约在许多回调地方可能出现错误,像是文件 I/O, 网络请求等。RxJS 支持这个约定通过 Rx.Observable.fromNodeCallback 方法处理错误,如果出现,错误会被捕获并通过 onError 发送通知。否则,onNext会发送回调参数,然后是 onCompleted 通知。

下面这个例子,我们将 Node.js fs.rename 函数转换成数据流。

var fs = require('fs'),
    Rx = require('rx');

// Wrap fs.rename
var rename = Rx.Observable.fromNodeCallback(fs.rename);

// 重命名文件,除了错误,没有参数返回
var source = rename('file1.txt', 'file2.txt');

var subscription = source.subscribe(
    x => console.log('onNext: success!'),
    e => console.log('onError: %s', e),
    () => console.log('onCompleted'));

// => onNext: success!
// => onCompleted

将数据流转换成 Callbacks

我们可以很轻易地将数据流转换成 callback. 这当然需要观察到的序列产生只有一个值,这是有道理的. 让我们将 timer 方法转换后来实现等待一段时间。toCallback 的实现看起来像下面这样,注意,这没有包含在 RxJS 中,但需要的话你可以很简单地添加它。

Rx.Observable.prototype.toCallback = cb => {
  var source = this;
  return () => {
    var val, hasVal = false;
    source.subscribe(
      x=> { hasVal = true; val = x; },
      e => throw e, // Default error handling
      () => hasVal && cb(val)}
    );
  };
};

然后,我们可以执行我们的命令就像下面这样:

function cb (x) { console.log('hi!'); }

setTimeout(
  Rx.Observable.timer(5000)
    .toCallback(cb)
  , 500);

将可观察对象转换成 Node.js 风格的 Callbacks

同样地你可能也希望使用 Node.js 风格的 callback。 同样的,就像上面那样,也限制有一个单一的值和结束。toNodeCallback的实现像下面这样。注意,这没有包含在 RxJS 中,但需要的话你可以很简单地添加它。

Rx.Observable.prototype.toNodeCallback = cb => {
  var source = this;
  return () => {
    var val, hasVal = false;
    source.subscribe(
      x => { hasVal = true; val = x; },
      e => cb(e),
      () => hasVal && cb(null, val)}
    );
  };
};

我们可以这样用,例如如果我们有一个观察序列,调用时可以获取一个值,然后将其转换为Node.js的风格。

getData().toNodeCallback((err, data) => {
    if (err) { throw err; }
    // Do something with the data
});