使用RXJS组合、过滤和映射数据流的值

使用RXJS组合、过滤和映射数据流的值

查询可观测序列

事件桥接 一文中,我们将现有的DOM和Node.js事件转换成可观察的序列以订阅它们。在本主题中,我们将把可观察序列的父级class视为IObservable对象,其中Rx组件提供通用LINQ操作符来操作这些对象。大多数操作符获取可观察的序列并对其执行一些逻辑并输出另一个可观测序列。另外,从代码示例可以看出,甚至可以在源序列上使用多个运算符,最终将结果序列调整到您的确切需求。

本文翻译在: RXJS中文文档

使用不同的运算符

我们已经在以前的主题中使用createrange运算符来创建和返回简单的序列。我们还使用fromEventfromEventPattern运算符将现有事件转换成可观察的序列。在本主题中,我们将使用其他Observable类型的运算符,以便可以过滤,分组和转换数据。这些运算符将可观察到的序列作为输入,并生成输出另一个可观察序列。

组合不同序列

在本节中,我们会研究将各种可观察序列组合成单个可观察序列的操作符。请注意,当我们组合序列时,数据不会被转换。在以下示例中,我们使用Concat运算符将两个序列组合成一个序列并订阅它。为了说明的目的,我们将使用非常简单的range(x, y)运算符创建一个从x开始的整数序列,然后产生y个序列数字。

var source1 = Rx.Observable.range(1, 3);
var source2 = Rx.Observable.range(1, 3);

source1.concat(source2)
   .subscribe(console.log.bind(console));

// => 1
// => 2
// => 3
// => 1
// => 2
// => 3

注意,结果序列是1,2,3,1,2,3。这是因为当您使用concat运算符时,第二个序列(source2)将在第一个序列(source1)完成推送其所有值之后才会激活。只有在source1完成之后,source2才会将值推送到最后的序列。然后,订阅者将从得到的序列中获取所有值。

merge 操作符进行比较。如果运行以下示例代码,您将获得1,1,2,2,3,3。这是因为两个序列同时处于活动状态,并且值在数据源中发生时被推出。结果序列仅在最后一个数据源完成推送值时完成。

var source1 = Rx.Observable.range(1, 3);
var source2 = Rx.Observable.range(1, 3);

source1.merge(source2)
   .subscribe(console.log.bind(console));

// => 1
// => 1
// => 2
// => 2
// => 3
// => 3

catch 操作符可以进行另一个比较。在这种情况下,如果source1完成没有任何错误,那么source2将不会启动。因此,如果运行以下示例代码,则获得1,2,3因为source2(产生4,5,6))被忽略。

var source1 = Rx.Observable.range(1, 3);
var source2 = Rx.Observable.range(4, 3);

source1.catch(source2)
   .subscribe(console.log.bind(console));

// => 1
// => 2
// => 3

最后,我们来看看onErrorResumeNext。即使由于错误导致source1无法完成,该操作符也将移动到source2。在以下示例中,即使source1表示通过使用throw运算符终止异常的序列,用户将接收source2发布的值(1,2,3)。因此,如果您预期到任何一个源序列产生任何错误,那么使用它onErrorResumeNext来保证用户仍然会收到一些值是更安全的。

var source1 = Rx.Observable.throw(new Error('An error has occurred.'));
var source2 = Rx.Observable.range(1, 3);

source1.onErrorResumeNext(source2)
   .subscribe(console.log.bind(console));

// => 1
// => 2
// => 3

映射

selectmap 操作符将可观察到的一个序列的每个元素转换成另一种形式。

在下面的示例中,我们将一系列字符串映射到一系列表示长度的整数中。

var array = ['Reactive', 'Extensions', 'RxJS'];

var seqString = Rx.Observable.from(array);

var seqNum = seqString.map(x => x.length);

seqNum
   .subscribe(console.log.bind(console));

// => 8
// => 10
// => 4

在以下示例中,我们在“桥接现有事件”主题中看到的事件转换示例的扩展,我们使用selectmap运算符将事件参数投影到x和y点。这样,我们将鼠标移动事件序列转换为可以进一步解析和操作的数据类型,如下一个“过滤”部分所示。

var move = Rx.Observable.fromEvent(document, 'mousemove');

var points = move.map(e => ({x: e.clientX, y: e.clientY }));

points.subscribe(
    pos => console.log('Mouse at point ' + pos.x + ', ' + pos.y));

最后,我们来看看selectMany or flatMap运算符。selectManyflatMap操作符具有许多重载,其中一个就是需要选择器函数作为参数。这个选择器函数是由数据源推出的每个值去调用的。对于每一个值,选择器将其映射成一个迷你的可观察序列。最后,selectMany或者flatMap操作符将所有这些迷你序列扁平化成单个结果序列,然后将其推送到用户。

在数据源和由选择器函数产生的所有迷你可观察序列都已经完成之后,源序列返回selectManyflatMap发布的onCompleted。当发生源数据流中的错误时触发onError,当一个异常被选择函数抛出,或者当在任何迷你观察序列的发生了错误。

在下面的例子中,我们首先创建一个数据源序列,每5秒产生一个整数,并决定使用生成的前两个值(使用take运算符)。然后,我们使用selectMany或者flatMap对另一个序列{100,101,102}这些整数进行映射。通过这样做,产生两个迷你观察序列{100,101,102}和{100,101,102}。它们最终平坦化成{100,101,102,100,101,102}的单个整数流,并被推送到观察者。

var source1 = Rx.Observable.interval(5000).take(2);
var proj = Rx.Observable.range(100, 3);
var resultSeq = source1.flatMap(proj);

var subscription = resultSeq.subscribe(
  x => console.log('onNext: %s', x),
  e => console.log('onError: %s', e.message),
  () => console.log('onCompleted'));

// => onNext: 100
// => onNext: 101
// => onNext: 102
// => onNext: 100
// => onNext: 101
// => onNext: 102
// => onCompleted

过滤

在下面的例子中,我们使用generate 运算符创建一个简单的可观察数字序列。该generate操作符有几个版本,包括有相对和绝对时间调度。在我们的示例中,它需要初始状态(在我们的示例中为0),一个条件函数终止(少于10次),迭代器(+1),结果选择器(当前值的平方函数))和打印只使用小于5的那些使用filterwhere运算符。

var seq = Rx.Observable.generate(
    0,
    i => i < 10,
    i => i + 1,
    i => i * i);

var source = seq.filter(n => n < 5);

var subscription = source.subscribe(
  x => console.log('onNext: %s', x),
  e => console.log('onError: %s', e.message),
  () => console.log('onCompleted'));

// => onNext: 0
// => onNext: 1
// => onNext: 4
// => onCompleted

以下示例是本主题前面已经看到的映射示例的扩展。在该示例中,我们使用selectmap运算符将事件参数投影到具有x和y的点。在下面的例子中,我们使用filterwhereselectmap操作符只挑选那些鼠标移动,我们感兴趣的是,在这种情况下,我们鼠标移动过滤,以找出在第一平分线(其中x和y坐标是相等的)。

var move = Rx.Observable.fromEvent(document, 'mousemove');

var points = move.map(e => ({ x: e.clientX, y: e.clientY }));

var overfirstbisector = points.filter(pos => pos.x === pos.y);

var movesub = overfirstbisector.subscribe(pos => console.log('mouse at ' + pos.x + ', ' pos.y));

基于时间的操作

您可以使用缓冲区运算符执行基于时间的操作。

缓冲可观察序列意味着可观测序列的值基于指定的时间段或计数阈值被放入缓冲区。这在您预期有大量数据被序列推出的情况下特别有用,并且订阅者没有资源来处理这些值。通过基于时间或计数缓冲结果,并且只有在超过条件时才返回值序列(或者源序列完成时),用户可以按照自己的速度处理OnNext调用。

在下面的例子中,我们首先创建一个以每秒为时间单位的简单的整数序列。然后我们使用bufferWithCount运算符,并指定每个缓冲区将保存序列中的5个项目。在onNext当缓冲区已满被调用。然后我们使用缓冲区的总和Array.reduce。缓冲区自动刷新,另一个循环开始。打印输出将为10,35,60 …,其中10 = 0 + 1 + 2 + 3 + 4,35 = 5 + 6 + 7 + 8 + 9等。

var seq = Rx.Observable.interval(1000);

var bufSeq = seq.bufferWithCount(5);

bufSeq
    .map(arr => arr.reduce((acc, x) => acc + x, 0))
    .subscribe(console.log.bind(console));

// => 10
// => 35
// => 60
...

我们还可以创建一个指定时间跨度(以毫秒为单位)的缓冲区。在以下示例中,缓冲区将保存累积3秒钟的项目。打印输出将为3,12,21 …其中3 = 0 + 1 + 2,12 = 3 + 4 + 5,依此类推。

var seq = Rx.Observable.interval(1000);

var bufSeq = seq.bufferWithTime(3000);

bufSeq
    .map(arr => arr.reduce((acc, x) => acc + x, 0))
    .subscribe(console.log.bind(console));

请注意,如果您使用任何一个buffer*window*运算符,则必须确保序列不为空,然后再过滤。

按类别操作

类别划分的操作符主题列出了按类别实施的Observable的所有主要操作符; 具体来说:创建,转换,合并,功能,数学,时间,异常,杂项,选择和原值。

0%