响应式编程
在响应式编程中,我们把任何可以使用的数据源看做事件流
,万物皆stream
。响应式编程是围绕数据的流动和传播
的,某个变量的变化会导致其他变量的变化。数据从生产者出发,经过一系列管道操作符的处理后,发送给消费者。
优点
- 在思考的维度上加入时间考量。
- 把数据的产生和处理分开,可以用同样的方式处理不同来源的数据。
- 基于推的消息订阅与发布模型。
- 可以使用同样的方式处理同步和异步编程。
Observer
相当于事件流的消费者,有时又称为Subscriber
Observer暴露了三个函数:next,complete,error。Observer的结构如下:
let observer = {
next: function() {},
error: function() {},
complete: function() {}
}
Observable
相当于数据的生产者,负责推送事件,但不处理事件。
相当于发生于未来的异步事件流数组
约定:rx的Observable名字都是一个stream,变量命名以$
结尾。
Observable.subscribe(Observer);
官网说明:
In ReactiveX an observer subscribes to an Observable. Then that observer reacts to whatever item or sequence of items the Observable emits.
Observable.subscribe 实现了重载,可以传入一个 observer,也可以(通常)只传 next 函数。
Observable可以理解为一个类:
class Observable {
// 数据生产
constructor(dataStream) {
}
// 数据消费
subscribe(observer) {
observer.next(someData)
observer.complete('end of stream')
// ...
return {
unsubscribe: () => {}
};
}
}
与 Promise 的区别
- 可以发送多个值
- 可以取消:unsubscribe
- 具有处理数组的一系列操作符
创建
官网有一些内置的方法来创建Observable,
- create
- of
- from
- fromEvent
- fromPromise
- empty
- never
- throw
- interval
- timer
冷和热
- 冷:录播(
点播) - 热:直播
我们将数据按照两个维度划分为田字表:单值与多值、同步与异步。Observable可以包装这四类数据。
单值,同步
Rx.Observable.of(2017)
多值,同步
Rx.Observable.from([1, 2, 3])
单值,异步
const one = new Promise((resolve, reject) => {
setTimeout(() => {
resolve(1);
}, 3000);//模拟3秒延迟
});
//这个时候Promise中的代码已经在运行了
Rx.Observable.fromPromise(one)
.map(value => value = value + 1);
.subscribe(result => {
console.log(result);
});
多值,异步
Rx.Observable.fromEvent(document.getElementById('btn'), 'click');
Subject
既可以作为Observable又可以作为Observer:一个Subject可以订阅一个Observable,就像一个观察者;并且它可以发射新的数据,或者传递它接受到的数据,就像一个Observable。
// 创建一个Observable,一秒钟输出一个数字,只取三个就结束
var source = Rx.Observable.interval(1000).take(3);
// 定义两个observer对象
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
// 创建一个subject —— 特殊的Observable
var subject = new Rx.Subject()
// observerA订阅Subject
subject.subscribe(observerA)
// Subject又以observer的身份订阅Observable
source.subscribe(subject);
setTimeout(() => {
subject.subscribe(observerB);
}, 1000);
// 输出:
// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"
// A、B两个observer互不影响,是独立的
作为Observable,与普通的Observable的区别是,他可以多路推送
const subject = new Rx.ReplaySubject(1);
subject.next(100);
subject.next(100);
subject.subscribe(console.log);
- Subject: hot Observable
- ReplaySubject: 可以重播指定参数个数的事件流
- BehaviorSubject:可以重播最后的一个事件流,相当于ReplaySubject(1)
Subscription
对于一些需要释放资源的Observable(如通过interval,timer创建的),可以通过subscription.unsubscribe
来释放。
操作符
又叫管道
分类
- 过滤类: filter, take, first, last, skip, distinct, distinctUntilChanged
- 工具类: do, delay, toPromise, toArray
- 时间类: timer(setTimeout), interval, delay, timeInterval(计时操作符), debounceTime(去抖动:一段时间内,只取最新数据进行发射,其他数据取消发射。适用于input的keyup), throttleTime(截流器:每当源Observable数据集发射一个数据项时,会等待n秒,n秒后输出源Observable的最新值。适用于mousemove), throttle, debounce, buffer, bufferCount…
- 1:1效果:
- 1:N效果:concat, concatAll, concatMap, concatMapTo, merge, mergeAll, mergeMap, mergeMapTo, switchMap,switchMapTo
- N:1效果:buffer, bufferCount, bufferTime, bufferWhen
- 错误处理:catch, retry, retryWhen
- 判断:
- 统计:
- 其他:
详解
- startWith: 设置发射的第一个值
- combineLatest: 函数将两个流作为输入,并且当其中任意一个流发射之后, combineLatest 都会组合两个流中最新的值 a 和 b然后输出一个新的流。流的值为 c = f(x,y) 其中 f(x, y) 是传入的自定义函数,配合上时序图更好理解:
流 A: --a-----------e--------i--------> 流 B: -----b----c--------d-------q----> vvvvvvvv combineLatest(f) vvvvvvv ----AB---AC--EC---ED--ID--IQ----> 这里的函数f,将输入的字符串变为大写
- mapTo: 改为静态值
- scan: 类似于 reduce,但保存reduce的中间值,返回的是数组
- pluck: 类似于 lodash 中的 pluck
map, flatMap, switchMap, concatMap的区别:
- map
let stream = Observable.interval(1000).take(10); return stream.map(n => n * 2);
- flatMap(别名mergeMap):将Observable打平,返回的是新Observable发送的数据,而不是Observable。
let stream = Observable.interval(1000).take(10); return stream.flatMap(Observable.of); // return stream.flatMap(v => Observable.of(v));
如果用map,则map的迭代器函数返回的是一个Observable,如果用flatMap,则会将这个Observable打平,返回他包装的数据。
- switchMap(别名flatMapLatest):使用flatMap就可以直接获取到新的Observable返回的数据。但是这里存在一个问题,如果用户有多次输入,由于网络原因可能会发生前一次响应时间比后一次长的情况,这时后一次的结果就被覆盖了。
switchMap可以解决这个问题。如果之前的Observable还没有未触发,而又收到了新的Observable,switchMap会取消(订阅,并不会取消发布)之前的Observable,只处理最新收到的Observable,这样就保证了处理请求的先后顺序。
Observable .fromEvent($input, 'keyup') .switchMap(text => getHttpResponse(text)) .subscribe(data => console.log(data))
- concatMap vs flatMap: ```javascript // rxjs v6 import { of } from ‘rxjs’; import { map, concatMap, delay, mergeMap } from ‘rxjs/operators’;
const startTime = new Date().getTime()
//emit delay value
const source = of(2000, 1000);
// map value from source into inner observable, when complete emit result and move to next
const example = source.pipe(
concatMap(val => of(Delayed by: ${val}ms
).pipe(delay(val))),
map(v => {
let now = new Date().getTime()
console.log(‘——’, now-startTime)
})
);
//output: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms
const subscribe = example.subscribe(val =>
console.log(With concatMap: ${val}
)
);
// showing the difference between concatMap and mergeMap
const mergeMapExample = source
.pipe(
// just so we can log this after the first example has run
delay(5000),
mergeMap(val => of(Delayed by: ${val}ms
).pipe(delay(val))),
map(v => {
let now = new Date().getTime()
console.log(‘——’, now-startTime)
})
)
.subscribe(val => console.log(With mergeMap: ${val}
));
## distinct和distinctUntilChanged
- distinct:所有的事件流相比
- distinctUntilChanged:和前一个相比
![](../_site/img/in-post/rxjs/操作符/1.png)
## merge,concat和startWith
- merge:严格按照时间顺序合并
- concat:合并的流不关注时间顺序,只是将两个流按照数组连接起来。
- startWith:在流的最前面添加一个值
![](../_site/img/in-post/rxjs/操作符/2.png)
## combinLatest,withLatestFrom和zip
- combinLatest:当有一个流出现新值时重新组合
- withLatestFrom:与上者相比,以源事件流为基准;返回的是一个数组。
- zip:两个流是成对组合的
![](../_site/img/in-post/rxjs/操作符/3.png)
## debounce和debounceTime
- debounce: 参数是一个Observable,更灵活
- debounceTime: 参数是一个时间。超过设定时间的事件流才放行
# Angular中对Rx的支持
- 内置的Observable:Http, ReactiveForms, Route
- Async Pipe:angular内部实现subscribe和unsubscribe
# 案例
## 轮询请求后台
private fetchInterval() { const record$ = interval(2000).pipe( switchMap((v, i) => { return this.service.findAll(this.msRecord._id); }), map(response => { if(!response.error) { return response.data; } }) )
const _subscription = record$.subscribe(doc => {
this.msRecord = doc;
this.msRecord.IO.mode = 'read';
if(this.msRecord.progress === 100) {
_subscription.unsubscribe();
}
});
} ```
参考文献
- RxJS 官网Observable
- Observable和Observer
- RxJS 核心概念之Subject
- 时间相关操作符
- 通过RxJS的操作符在Canvas上画图
- 通过RxJS的操作符推送关注人
- 官网宝石图
- 操作符介绍
- RxJS操作符中文文档
- RxJS Observable详解
- 实例解析防抖动(Debouncing)和节流阀(Throttling)