Contents
  1. 1. rxjs
    1. 1.1. 什么事RxJS
    2. 1.2. RxJS主要成员(核心概念)
    3. 1.3. Example
      1. 1.3.1. Observable 创建
      2. 1.3.2. Operator 操作符
      3. 1.3.3. catch() 错误处理

rxjs

什么事RxJS

RXJS 是 ReactiveX编程理念/思想的 JavaScript实现版本. 其它语言里, 如 Java 有对应的 RxJava[https://github.com/ReactiveX/RxJava]

ReactiveX是一种针对异步数据流的编程; ReactiveX 是由微软xx架构师创造.

ReactiveX 参考: http://reactivex.io/intro.html

RxJS主要成员(核心概念)

  • Observable

    数据生产、传播

  • Observer

    数据消费

  • Subscriber

    连接 Observable 和 Observer

  • Operator

    数据流、传播途中对数据值进行操作/转换的操作符

  • Subject

    包含 Observable 和 Observer

Example

1
2
3
4
5
6
7
8
9
mkdir rxjs_exercise
cd rxjs_exercise
npm init
# or -g
npm install --save-dev ts-node
# must
npm install rxjs --save
Observable 创建

e.g. 1 【from】

app.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import { Observable, } from 'rxjs';
import 'rxjs/add/observable/from'; # 操作符 from
let persons = [
{ name: 'Dave', age: 34, salary: 2000 },
{ name: 'Nick', age: 37, salary: 32000 },
{ name: 'Howie', age: 40, salary: 26000 },
{ name: 'Brian', age: 40, salary: 30000 },
{ name: 'Kevin', age: 47, salary: 24000 },
];
let index = 0;
Observable.from(persons).subscribe(
person => {
console.log(++index, person);
},
error => console.log(error),
() => console.log('stream end/// ')
);
1
2
3
4
5
6
7
8
9
10
ts-node app
# 输出结果
1 { name: 'Dave', age: 34, salary: 2000 }
2 { name: 'Nick', age: 37, salary: 32000 }
3 { name: 'Howie', age: 40, salary: 26000 }
4 { name: 'Brian', age: 40, salary: 30000 }
5 { name: 'Kevin', age: 47, salary: 24000 }
stream end///

subscribe方法里有三个方法参数, from 操作符用来读取 person 对象, subscribe里的方法开始执行时, Observable 才开始想它推送 (person)对象数据.

e.g. 2 [create]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import {Observable} from 'rxjs/Observable';
// generateData 对象
function generateData() {
const people = [
{ name: 'David', age: 3, salary: 2000 },
{ name: 'Dav', age: 17, salary: 12000 },
{ name: 'Lomo', age: 20, salary: 16000 },
{ name: 'Dev', age: 20, salary: 10000 },
{ name: 'Amy', age: 7, salary: 34000 },
];
return Observable.create(
observer => {
let i = 0;
people.forEach( p => {
console.log('推送第 ' + (++i) + ' 条数据');
observer.next(p)
});
observer.complete();
}
);
}
generateData().subscribe(
people => {
console.log(people.name + '--' + people.age + '--' + people.salary);
},
error => { console.log(error); },
() => { console.log('消费完了.'); }
);
1
2
3
4
5
6
7
8
9
10
11
12
13
# ts-node create 运行结果如下:
推送第 1 条数据
David--3--2000
推送第 2 条数据
Dav--17--12000
推送第 3 条数据
Lomo--20--16000
推送第 4 条数据
Dev--20--10000
推送第 5 条数据
Amy--7--34000
消费完了.

Observable.create() 方法接受一个 function 方法参数. 通过运行结果发现, 当订阅者每消费一条数据, observable 就会推送一条数据, 并逐次推送对象里的数据给消费者, 直到 complete(), 从而形成 流式.

尝试将 .subscriobe() 方法里的三个方法注释掉,

1
2
3
4
5
6
7
generateData().subscribe(
// people => {
// console.log(people.name + '--' + people.age + '--' + people.salary);
// },
// error => { console.log(error); },
// () => { console.log('消费完了.'); }
);

运行结果:

1
2
3
4
5
推送第 1 条数据
推送第 2 条数据
推送第 3 条数据
推送第 4 条数据
推送第 5 条数据

整个数据流的传递 只有 Observable 在被订阅(.subscribe())消费时才会去推送数据流(不管订阅者是否处理消费, 只要订阅了 就给你推送).

Operator 操作符

常见操作符: of, map, reduce, filter, take, first, Timer, Interval.

操作符本质上是一个 function, 用来处理、加工 Observable 中传递的数据流里的数据. 这个 操作符 function() 输入、输出(返回)均为Observable类型

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/from';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/reduce';
let persons = [
{ name: 'David', age: 3, salary: 2000 },
{ name: 'Dav', age: 17, salary: 12000 },
{ name: 'Lomo', age: 20, salary: 16000 },
{ name: 'Dev', age: 20, salary: 10000 },
{ name: 'Amy', age: 7, salary: 34000 },
];
// reduce 里的0 是设定的一个默认返回值, 可以不给.
Observable.from(persons).map(p => p.salary).reduce((total, current) => total + current, 0).subscribe(
total => {
console.log(`total salary is: ${total}`)
// console.log(persons);
},
err => console.log(err),
() => console.log('finished.')
);
1
2
3
# ts-node operator
total salary is: 74000
finished.

通过 map 方法获取数据流的所有salary, 并交给reduce 计算获取总和

参考:

https://www.jianshu.com/p/d8cb71554008

https://segmentfault.com/a/1190000008834251

catch() 错误处理

错误处理需要在 数据流 到达 Observer 之前拦截处理。


    
        
        版权声明:
        本文由Lomo创作和发表,采用署名(BY)-非商业性使用(NC)-相同方式共享(SA)国际许可协议进行许可,
        转载请注明作者及出处,本文作者为Lomo,本文标题为rxjs notes.
    
    


 Leave a message ^_^:

Contents
  1. 1. rxjs
    1. 1.1. 什么事RxJS
    2. 1.2. RxJS主要成员(核心概念)
    3. 1.3. Example
      1. 1.3.1. Observable 创建
      2. 1.3.2. Operator 操作符
      3. 1.3.3. catch() 错误处理