Skip to content
 

RxJS:响应式编程的核心工具库

更新: 9/4/2025字数: 0 字 时长: 0 分钟

RxJS(Reactive Extensions for JavaScript)JavaScript的响应式编程库,它使用Observable序列来编写异步和基于事件的程序。作为Angular的核心依赖,RxJS提供了强大的数据流处理能力。

一、核心概念

1. Observable(可观察对象)

  • 定义:表示一个可调用的未来值或事件的集合
  • 特点
    • 惰性执行(只有订阅时才会启动)
    • 可以发送多个值(同步或异步)
    • 支持取消
typescript
import { Observable } from "rxjs";

const observable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => {
    subscriber.next(3);
    subscriber.complete();
  }, 1000);
});

2. Observer(观察者)

  • 包含三个方法的对象:
    • next(value):接收 Observable 发出的值
    • error(err):处理错误
    • complete():处理完成通知
typescript
const observer = {
  next: (x) => console.log("收到值: " + x),
  error: (err) => console.error("发生错误: " + err),
  complete: () => console.log("已完成")
};

3. Subscription(订阅)

  • 表示 Observable 的执行
  • 用于取消执行
typescript
const subscription = observable.subscribe(observer);
subscription.unsubscribe(); // 取消订阅

4. Subject(主体)

  • 既是 Observable 又是 Observer
  • 多播(多个观察者共享同一个 Observable 执行)
typescript
import { Subject } from "rxjs";

const subject = new Subject<number>();

subject.subscribe({ next: (v) => console.log(`观察者A: ${v}`) });
subject.subscribe({ next: (v) => console.log(`观察者B: ${v}`) });

subject.next(1);
subject.next(2);

二、创建 Observable

1. 创建类操作符

操作符描述
of创建一个发出给定参数列表的 Observable
from从数组、Promise 或可迭代对象创建 Observable
fromEvent从 DOM 事件或 Node.js EventEmitter 创建 Observable
interval每隔指定时间发出递增数字
timer延迟后开始发出值,然后定期发出
ajax创建 AJAX 请求的 Observable
typescript
import { of, from, fromEvent, interval } from "rxjs";

// 创建示例
of(1, 2, 3).subscribe(console.log);
from([1, 2, 3]).subscribe(console.log);
fromEvent(document, "click").subscribe(console.log);
interval(1000).subscribe(console.log);

三、常用操作符

1. 转换类操作符

操作符描述
map对每个值应用函数进行转换
pluck提取对象属性
scan类似 reduce,但每次累加都发出值
switchMap映射成 Observable,取消前一个内部 Observable 的订阅
concatMap按顺序映射成 Observable 并连接
mergeMap映射成 Observable 并合并发出值

2. 过滤类操作符

操作符描述
filter只发出满足条件的值
take只取前 N 个值
takeUntil直到另一个 Observable 发出值才停止
debounceTime只在特定时间间隔后没有新值时才发出最新值
distinctUntilChanged只有当当前值与上一次不同时才发出

3. 组合类操作符

操作符描述
merge合并多个 Observable 的输出
concat按顺序连接多个 Observable
combineLatest当任意输入 Observable 发出值时,组合各 Observable 的最新值
withLatestFrom类似 combineLatest,但只在源 Observable 发出值时组合
zip严格按顺序组合多个 Observable 的值

4. 错误处理类操作符

操作符描述
catchError捕获错误并返回新的 Observable 或抛出错误
retry遇到错误时重试指定次数
retryWhen根据条件决定是否重试

四、实际应用示例

1. 自动完成搜索

typescript
import { fromEvent } from "rxjs";
import { debounceTime, distinctUntilChanged, switchMap } from "rxjs/operators";

const searchBox = document.getElementById("search");
const search$ = fromEvent(searchBox, "input").pipe(
  debounceTime(300),
  map((event) => event.target.value),
  distinctUntilChanged(),
  switchMap((query) => ajax.getJSON(`/api/search?q=${query}`))
);

search$.subscribe((results) => {
  // 更新UI
});

2. 多请求并行处理

typescript
import { forkJoin } from "rxjs";

forkJoin({
  user: ajax.getJSON("/api/user/1"),
  posts: ajax.getJSON("/api/posts?userId=1")
}).subscribe(({ user, posts }) => {
  console.log("用户:", user);
  console.log("文章:", posts);
});

3. 状态管理

typescript
import { BehaviorSubject } from "rxjs";

class Store {
  private state = new BehaviorSubject({ count: 0 });

  select(key) {
    return this.state.pipe(pluck(key), distinctUntilChanged());
  }

  update(newState) {
    this.state.next({ ...this.state.value, ...newState });
  }
}

const store = new Store();
store.select("count").subscribe((count) => console.log("当前计数:", count));
store.update({ count: 1 });

五、最佳实践

  1. 内存管理

    • 及时取消订阅(使用takeUntilasync管道)
    • 避免内存泄漏
  2. 错误处理

    • 总是处理错误(catchError
    • 考虑重试策略(retryretryWhen
  3. 性能优化

    • 使用shareReplay共享 Observable 执行
    • 合理使用防抖(debounceTime)和节流(throttleTime
  4. 调试技巧

    • 使用tap操作符进行日志记录
    typescript
    observable.pipe(
      tap((value) => console.log("收到值:", value)),
      catchError((err) => {
        console.error("发生错误:", err);
        return throwError(err);
      })
    );

六、与 Promise 对比

特性PromiseObservable
多值只能 resolve 一次可以发出多个值
取消不可取消可取消
惰性执行立即执行订阅时执行
操作符丰富的操作符
异步/同步总是异步可同步可异步

七、常见问题解答

Q1: 什么时候应该使用 RxJS?

  • 处理复杂异步逻辑
  • 需要组合多个异步源
  • 需要取消异步操作
  • 需要响应式编程范式

Q2: Cold Observable 和 Hot Observable 有什么区别?

  • Cold:每个订阅都会独立执行(如 HTTP 请求)
  • Hot:多个订阅共享同一个执行(如 DOM 事件)

Q3: 如何避免内存泄漏?

  • 使用takeUntil模式取消订阅
  • Angular 中使用async管道自动管理订阅
  • 避免在组件中手动订阅而不取消

我见青山多妩媚,料青山见我应如是。