主题
RxJS:响应式编程的核心工具库
更新: 9/4/2025字数: 0 字 时长: 0 分钟
RxJS(Reactive Extensions for JavaScript)是JavaScript的响应式编程库,它使用Observable序列来编写异步和基于事件的程序。作为Angular的核心依赖,RxJS提供了强大的数据流处理能力。
一、核心概念
1. Observable(可观察对象)
- 定义:表示一个可调用的未来值或事件的集合
- 特点:
- 惰性执行(只有订阅时才会启动)
- 可以发送多个值(同步或异步)
- 支持取消
typescript
import { Observable } from "rxjs";
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
setTimeout(() => {
subscriber.next(3);
subscriber.complete();
}, 1000);
});2. Observer(观察者)
- 包含三个方法的对象:
next(value):接收 Observable 发出的值error(err):处理错误complete():处理完成通知
typescript
const observer = {
next: (x) => console.log("收到值: " + x),
error: (err) => console.error("发生错误: " + err),
complete: () => console.log("已完成")
};3. Subscription(订阅)
- 表示 Observable 的执行
- 用于取消执行
typescript
const subscription = observable.subscribe(observer);
subscription.unsubscribe(); // 取消订阅4. Subject(主体)
- 既是 Observable 又是 Observer
- 多播(多个观察者共享同一个 Observable 执行)
typescript
import { Subject } from "rxjs";
const subject = new Subject<number>();
subject.subscribe({ next: (v) => console.log(`观察者A: ${v}`) });
subject.subscribe({ next: (v) => console.log(`观察者B: ${v}`) });
subject.next(1);
subject.next(2);二、创建 Observable
1. 创建类操作符
| 操作符 | 描述 |
|---|---|
of | 创建一个发出给定参数列表的 Observable |
from | 从数组、Promise 或可迭代对象创建 Observable |
fromEvent | 从 DOM 事件或 Node.js EventEmitter 创建 Observable |
interval | 每隔指定时间发出递增数字 |
timer | 延迟后开始发出值,然后定期发出 |
ajax | 创建 AJAX 请求的 Observable |
typescript
import { of, from, fromEvent, interval } from "rxjs";
// 创建示例
of(1, 2, 3).subscribe(console.log);
from([1, 2, 3]).subscribe(console.log);
fromEvent(document, "click").subscribe(console.log);
interval(1000).subscribe(console.log);三、常用操作符
1. 转换类操作符
| 操作符 | 描述 |
|---|---|
map | 对每个值应用函数进行转换 |
pluck | 提取对象属性 |
scan | 类似 reduce,但每次累加都发出值 |
switchMap | 映射成 Observable,取消前一个内部 Observable 的订阅 |
concatMap | 按顺序映射成 Observable 并连接 |
mergeMap | 映射成 Observable 并合并发出值 |
2. 过滤类操作符
| 操作符 | 描述 |
|---|---|
filter | 只发出满足条件的值 |
take | 只取前 N 个值 |
takeUntil | 直到另一个 Observable 发出值才停止 |
debounceTime | 只在特定时间间隔后没有新值时才发出最新值 |
distinctUntilChanged | 只有当当前值与上一次不同时才发出 |
3. 组合类操作符
| 操作符 | 描述 |
|---|---|
merge | 合并多个 Observable 的输出 |
concat | 按顺序连接多个 Observable |
combineLatest | 当任意输入 Observable 发出值时,组合各 Observable 的最新值 |
withLatestFrom | 类似 combineLatest,但只在源 Observable 发出值时组合 |
zip | 严格按顺序组合多个 Observable 的值 |
4. 错误处理类操作符
| 操作符 | 描述 |
|---|---|
catchError | 捕获错误并返回新的 Observable 或抛出错误 |
retry | 遇到错误时重试指定次数 |
retryWhen | 根据条件决定是否重试 |
四、实际应用示例
1. 自动完成搜索
typescript
import { fromEvent } from "rxjs";
import { debounceTime, distinctUntilChanged, switchMap } from "rxjs/operators";
const searchBox = document.getElementById("search");
const search$ = fromEvent(searchBox, "input").pipe(
debounceTime(300),
map((event) => event.target.value),
distinctUntilChanged(),
switchMap((query) => ajax.getJSON(`/api/search?q=${query}`))
);
search$.subscribe((results) => {
// 更新UI
});2. 多请求并行处理
typescript
import { forkJoin } from "rxjs";
forkJoin({
user: ajax.getJSON("/api/user/1"),
posts: ajax.getJSON("/api/posts?userId=1")
}).subscribe(({ user, posts }) => {
console.log("用户:", user);
console.log("文章:", posts);
});3. 状态管理
typescript
import { BehaviorSubject } from "rxjs";
class Store {
private state = new BehaviorSubject({ count: 0 });
select(key) {
return this.state.pipe(pluck(key), distinctUntilChanged());
}
update(newState) {
this.state.next({ ...this.state.value, ...newState });
}
}
const store = new Store();
store.select("count").subscribe((count) => console.log("当前计数:", count));
store.update({ count: 1 });五、最佳实践
内存管理:
- 及时取消订阅(使用
takeUntil或async管道) - 避免内存泄漏
- 及时取消订阅(使用
错误处理:
- 总是处理错误(
catchError) - 考虑重试策略(
retry、retryWhen)
- 总是处理错误(
性能优化:
- 使用
shareReplay共享 Observable 执行 - 合理使用防抖(
debounceTime)和节流(throttleTime)
- 使用
调试技巧:
- 使用
tap操作符进行日志记录
typescriptobservable.pipe( tap((value) => console.log("收到值:", value)), catchError((err) => { console.error("发生错误:", err); return throwError(err); }) );- 使用
六、与 Promise 对比
| 特性 | Promise | Observable |
|---|---|---|
| 多值 | 只能 resolve 一次 | 可以发出多个值 |
| 取消 | 不可取消 | 可取消 |
| 惰性执行 | 立即执行 | 订阅时执行 |
| 操作符 | 无 | 丰富的操作符 |
| 异步/同步 | 总是异步 | 可同步可异步 |
七、常见问题解答
Q1: 什么时候应该使用 RxJS?
- 处理复杂异步逻辑
- 需要组合多个异步源
- 需要取消异步操作
- 需要响应式编程范式
Q2: Cold Observable 和 Hot Observable 有什么区别?
- Cold:每个订阅都会独立执行(如 HTTP 请求)
- Hot:多个订阅共享同一个执行(如 DOM 事件)
Q3: 如何避免内存泄漏?
- 使用
takeUntil模式取消订阅 - Angular 中使用
async管道自动管理订阅 - 避免在组件中手动订阅而不取消