初识rxjs

Posted by Shen Chaoran on October 13, 2017

响应式编程

在响应式编程中,我们把任何可以使用的数据源看做事件流万物皆stream。响应式编程是围绕数据的流动和传播的,某个变量的变化会导致其他变量的变化。数据从生产者出发,经过一系列管道操作符的处理后,发送给消费者。

优点

  • 在思考的维度上加入时间考量。
  • 把数据的产生和处理分开,可以用同样的方式处理不同来源的数据。
  • 基于推的消息订阅与发布模型。
  • 可以使用同样的方式处理同步和异步编程。

Observer

相当于事件流的消费者,有时又称为Subscriber Observer暴露了三个函数:next,complete,error。Observer的结构如下:

let observer = {
  next: function() {},
  error: function() {},
  complete: function() {}
}

Observable

相当于数据的生产者,负责推送事件,但不处理事件。 相当于发生于未来的异步事件流数组 约定:rx的Observable名字都是一个stream,变量命名以$结尾。

Observable.subscribe(Observer); 官网说明:

In ReactiveX an observer subscribes to an Observable. Then that observer reacts to whatever item or sequence of items the Observable emits.

Observable.subscribe 实现了重载,可以传入一个 observer,也可以(通常)只传 next 函数

Observable可以理解为一个类:

class Observable {
    // 数据生产
    constructor(dataStream) {

    }

    // 数据消费
    subscribe(observer) {
        observer.next(someData)
        observer.complete('end of stream')
        // ...
        return {
            unsubscribe: () => {}
        };
    }
}

与 Promise 的区别

  • 可以发送多个值
  • 可以取消:unsubscribe
  • 具有处理数组的一系列操作符

创建

官网有一些内置的方法来创建Observable,

  • create
  • of
  • from
  • fromEvent
  • fromPromise
  • empty
  • never
  • throw
  • interval
  • timer

冷和热

  • 冷:录播(点播
  • 热:直播

我们将数据按照两个维度划分为田字表:单值与多值、同步与异步。Observable可以包装这四类数据。

单值,同步

Rx.Observable.of(2017)

多值,同步

Rx.Observable.from([1, 2, 3])

单值,异步

const one = new Promise((resolve, reject) => {
  setTimeout(() => {
    resolve(1);
  }, 3000);//模拟3秒延迟
});
//这个时候Promise中的代码已经在运行了

Rx.Observable.fromPromise(one)
  .map(value => value = value + 1);
  .subscribe(result => {
    console.log(result);
  });

多值,异步

Rx.Observable.fromEvent(document.getElementById('btn'), 'click');

Subject

既可以作为Observable又可以作为Observer:一个Subject可以订阅一个Observable,就像一个观察者;并且它可以发射新的数据,或者传递它接受到的数据,就像一个Observable。

// 创建一个Observable,一秒钟输出一个数字,只取三个就结束
var source = Rx.Observable.interval(1000).take(3);

// 定义两个observer对象
var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

// 创建一个subject —— 特殊的Observable
var subject = new Rx.Subject()

// observerA订阅Subject
subject.subscribe(observerA)

// Subject又以observer的身份订阅Observable
source.subscribe(subject);

setTimeout(() => {
    subject.subscribe(observerB);
}, 1000);

// 输出:
// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"
// A、B两个observer互不影响,是独立的

作为Observable,与普通的Observable的区别是,他可以多路推送

const subject = new Rx.ReplaySubject(1);
subject.next(100);
subject.next(100);
subject.subscribe(console.log);
  • Subject: hot Observable
  • ReplaySubject: 可以重播指定参数个数的事件流
  • BehaviorSubject:可以重播最后的一个事件流,相当于ReplaySubject(1)

Subscription

对于一些需要释放资源的Observable(如通过interval,timer创建的),可以通过subscription.unsubscribe来释放。

操作符

又叫管道

分类

  • 过滤类: filter, take, first, last, skip, distinct, distinctUntilChanged
  • 工具类: do, delay, toPromise, toArray
  • 时间类: timer(setTimeout), interval, delay, timeInterval(计时操作符), debounceTime(去抖动:一段时间内,只取最新数据进行发射,其他数据取消发射。适用于input的keyup), throttleTime(截流器:每当源Observable数据集发射一个数据项时,会等待n秒,n秒后输出源Observable的最新值。适用于mousemove), throttle, debounce, buffer, bufferCount…
  • 1:1效果:
  • 1:N效果:concat, concatAll, concatMap, concatMapTo, merge, mergeAll, mergeMap, mergeMapTo, switchMap,switchMapTo
  • N:1效果:buffer, bufferCount, bufferTime, bufferWhen
  • 错误处理:catch, retry, retryWhen
  • 判断:
  • 统计:
  • 其他:

详解

  • startWith: 设置发射的第一个值
  • combineLatest: 函数将两个流作为输入,并且当其中任意一个流发射之后, combineLatest 都会组合两个流中最新的值 a 和 b然后输出一个新的流。流的值为 c = f(x,y) 其中 f(x, y) 是传入的自定义函数,配合上时序图更好理解:
      流 A:     --a-----------e--------i-------->
      流 B:     -----b----c--------d-------q---->
                vvvvvvvv combineLatest(f) vvvvvvv
                ----AB---AC--EC---ED--ID--IQ---->
    
          这里的函数f,将输入的字符串变为大写
    
  • mapTo: 改为静态值
  • scan: 类似于 reduce,但保存reduce的中间值,返回的是数组
  • pluck: 类似于 lodash 中的 pluck

map, flatMap, switchMap, concatMap的区别:

  • map
      let stream = Observable.interval(1000).take(10);
      return stream.map(n => n * 2);
    
  • flatMap(别名mergeMap):将Observable打平,返回的是新Observable发送的数据,而不是Observable。
      let stream = Observable.interval(1000).take(10);
      return stream.flatMap(Observable.of);
      // return stream.flatMap(v => Observable.of(v));
    

    如果用map,则map的迭代器函数返回的是一个Observable,如果用flatMap,则会将这个Observable打平,返回他包装的数据。

  • switchMap(别名flatMapLatest):使用flatMap就可以直接获取到新的Observable返回的数据。但是这里存在一个问题,如果用户有多次输入,由于网络原因可能会发生前一次响应时间比后一次长的情况,这时后一次的结果就被覆盖了。 switchMap可以解决这个问题。如果之前的Observable还没有未触发,而又收到了新的Observable,switchMap会取消(订阅,并不会取消发布)之前的Observable,只处理最新收到的Observable,这样就保证了处理请求的先后顺序。
      Observable
          .fromEvent($input, 'keyup')
          .switchMap(text => getHttpResponse(text))
          .subscribe(data => console.log(data))
    
  • concatMap vs flatMap: ```javascript // rxjs v6 import { of } from ‘rxjs’; import { map, concatMap, delay, mergeMap } from ‘rxjs/operators’;

const startTime = new Date().getTime() //emit delay value const source = of(2000, 1000); // map value from source into inner observable, when complete emit result and move to next const example = source.pipe( concatMap(val => of(Delayed by: ${val}ms).pipe(delay(val))), map(v => { let now = new Date().getTime() console.log(‘——’, now-startTime) }) ); //output: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms const subscribe = example.subscribe(val => console.log(With concatMap: ${val}) );

// showing the difference between concatMap and mergeMap const mergeMapExample = source .pipe( // just so we can log this after the first example has run delay(5000), mergeMap(val => of(Delayed by: ${val}ms).pipe(delay(val))), map(v => { let now = new Date().getTime() console.log(‘——’, now-startTime) }) ) .subscribe(val => console.log(With mergeMap: ${val}));


## distinct和distinctUntilChanged

- distinct:所有的事件流相比
- distinctUntilChanged:和前一个相比
![](../_site/img/in-post/rxjs/操作符/1.png)

## merge,concat和startWith

- merge:严格按照时间顺序合并
- concat:合并的流不关注时间顺序,只是将两个流按照数组连接起来。
- startWith:在流的最前面添加一个值
![](../_site/img/in-post/rxjs/操作符/2.png)

## combinLatest,withLatestFrom和zip

- combinLatest:当有一个流出现新值时重新组合
- withLatestFrom:与上者相比,以源事件流为基准;返回的是一个数组。
- zip:两个流是成对组合的
![](../_site/img/in-post/rxjs/操作符/3.png)

## debounce和debounceTime
- debounce: 参数是一个Observable,更灵活
- debounceTime: 参数是一个时间。超过设定时间的事件流才放行

# Angular中对Rx的支持

- 内置的Observable:Http, ReactiveForms, Route
- Async Pipe:angular内部实现subscribe和unsubscribe

# 案例

## 轮询请求后台

private fetchInterval() { const record$ = interval(2000).pipe( switchMap((v, i) => { return this.service.findAll(this.msRecord._id); }), map(response => { if(!response.error) { return response.data; } }) )

const _subscription = record$.subscribe(doc => {
    this.msRecord = doc;
    this.msRecord.IO.mode = 'read';
    if(this.msRecord.progress === 100) {
        _subscription.unsubscribe();
    }
});

} ```

参考文献


App ready for offline use.