网站Logo 开飞机的舒克

Rx.js 详细技术指南

dddd
0
2026-05-21

1. 什么是 Rx.js

Rx.js (Reactive Extensions for JavaScript) 是一个基于观察者模式和迭代器模式的响应式编程库。它将异步事件流(如用户输入、HTTP 请求、WebSocket 消息等)抽象为 Observable,通过强大的操作符进行转换、过滤和组合。

1.1 响应式编程的核心思想

响应式编程是一种面向数据流变化传播的编程范式。在 Rx.js 中:

  • 数据流:所有事件都被视为可观察的序列

  • 声明式编程:描述"什么"而非"如何"

  • 异步处理:统一处理同步和异步操作

  • 组合性:通过操作符组合复杂逻辑

1.2 Rx.js 的优势

特性

说明

统一接口

处理 DOM 事件、Promise、WebSocket、定时器等

强大的操作符

提供上百个操作符处理数据流

取消机制

优雅地取消订阅,避免内存泄漏

错误处理

统一的错误处理机制

背压处理

控制数据流速率


2. 核心概念

2.1 Observable(可观察对象)

Observable 是 Rx.js 的核心,代表一个可观察的数据流。它可以发出三种类型的通知:

  1. Next:发出一个值

  2. Error:发出一个错误,终止流

  3. Complete:正常完成,终止流

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
  subscriber.next('Hello');
  subscriber.next('World');
  subscriber.complete();
});

2.2 Observer(观察者)

Observer 是一个包含三个回调函数的对象,用于处理 Observable 发出的通知:

const observer = {
  next: (value: T) => console.log('Value:', value),
  error: (err: Error) => console.error('Error:', err),
  complete: () => console.log('Completed')
};

// 订阅
observable.subscribe(observer);

2.3 Subscription(订阅)

Subscription 代表 Observable 的执行。调用 subscribe() 会返回一个 Subscription 对象,用于取消订阅:

const subscription = observable.subscribe(observer);

// 取消订阅
subscription.unsubscribe();

2.4 Operator(操作符)

操作符是用于处理 Observable 数据流的纯函数。Rx.js 提供了丰富的操作符,分为以下几类:

  • 创建操作符of, from, interval, timer

  • 转换操作符map, flatMap, switchMap, concatMap

  • 过滤操作符filter, take, skip, debounceTime

  • 组合操作符merge, concat, combineLatest, zip

  • 错误处理操作符catchError, retry, retryWhen

2.5 Subject(主题)

Subject 既是 Observable 也是 Observer,它可以多路广播值给多个订阅者:

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log('Observer A:', v)
});

subject.subscribe({
  next: (v) => console.log('Observer B:', v)
});

subject.next(1);  // Observer A: 1, Observer B: 1
subject.next(2);  // Observer A: 2, Observer B: 2

2.6 Scheduler(调度器)

Scheduler 控制 Observable 发出通知的时机和线程:

  • queue:同步执行

  • asap:微任务队列

  • async:宏任务队列(setTimeout)

  • animationFrame:动画帧


3. 创建 Observable

3.1 创建操作符

3.1.1 of - 从多个值创建

import { of } from 'rxjs';

const source$ = of(1, 2, 3, 4, 5);
source$.subscribe(console.log); // 1, 2, 3, 4, 5

3.1.2 from - 从数组、Promise、可迭代对象创建

import { from } from 'rxjs';

// 从数组
from([1, 2, 3]).subscribe(console.log);

// 从 Promise
from(fetch('/api/data')).subscribe(response => console.log(response));

3.1.3 interval - 创建定时器序列

import { interval } from 'rxjs';

const source$ = interval(1000); // 每秒发出一个递增数字
source$.subscribe(console.log); // 0, 1, 2, 3, ...

3.1.4 timer - 延迟后发出值

import { timer } from 'rxjs';

// 2秒后发出0
timer(2000).subscribe(console.log);

// 1秒后开始,每2秒发出一个值
timer(1000, 2000).subscribe(console.log);

3.1.5 fromEvent - 从 DOM 事件创建

import { fromEvent } from 'rxjs';

const button = document.querySelector('button');
const clicks$ = fromEvent(button, 'click');
clicks$.subscribe(() => console.log('Button clicked'));

3.2 转换操作符

3.2.1 map - 转换每个值

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

of(1, 2, 3).pipe(
  map(x => x * 2)
).subscribe(console.log); // 2, 4, 6

3.2.2 switchMap - 切换到新的 Observable

import { fromEvent, interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';

fromEvent(document, 'click').pipe(
  switchMap(() => interval(1000))
).subscribe(console.log);
// 点击后开始每秒输出,再次点击重新开始

3.2.3 concatMap - 顺序连接 Observable

import { of, interval } from 'rxjs';
import { concatMap, take } from 'rxjs/operators';

of(1, 2, 3).pipe(
  concatMap(id => interval(1000).pipe(take(2)))
).subscribe(console.log);
// 0, 1, 0, 1, 0, 1

3.2.4 mergeMap - 并发合并 Observable

import { of, interval } from 'rxjs';
import { mergeMap, take } from 'rxjs/operators';

of(1, 2, 3).pipe(
  mergeMap(id => interval(1000).pipe(take(2)))
).subscribe(console.log);
// 0, 0, 0, 1, 1, 1

3.3 过滤操作符

3.3.1 filter - 过滤值

import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5).pipe(
  filter(x => x % 2 === 0)
).subscribe(console.log); // 2, 4

3.3.2 take - 只取前 N 个值

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

interval(1000).pipe(
  take(3)
).subscribe(console.log); // 0, 1, 2

3.3.3 debounceTime - 防抖

import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

const input = document.querySelector('input');
fromEvent(input, 'input').pipe(
  debounceTime(300),
  map((e: Event) => (e.target as HTMLInputElement).value)
).subscribe(console.log);

3.3.4 distinctUntilChanged - 去重连续重复值

import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

of(1, 1, 2, 2, 3, 1).pipe(
  distinctUntilChanged()
).subscribe(console.log); // 1, 2, 3, 1

3.4 组合操作符

3.4.1 combineLatest - 组合多个 Observable 的最新值

import { interval, combineLatest } from 'rxjs';
import { take } from 'rxjs/operators';

const source1$ = interval(1000).pipe(take(3));
const source2$ = interval(2000).pipe(take(2));

combineLatest([source1$, source2$]).subscribe(console.log);
// [0, 0], [1, 0], [2, 0], [2, 1]

3.4.2 forkJoin - 等待所有 Observable 完成

import { forkJoin, of, interval } from 'rxjs';
import { take } from 'rxjs/operators';

forkJoin([
  of(1, 2, 3),
  interval(1000).pipe(take(2))
]).subscribe(console.log); // [3, 1]

3.4.3 merge - 合并多个 Observable

import { merge, interval } from 'rxjs';
import { take } from 'rxjs/operators';

const source1$ = interval(500).pipe(take(3));
const source2$ = interval(1000).pipe(take(2));

merge(source1$, source2$).subscribe(console.log);
// 0, 0, 1, 1, 2

3.5 错误处理操作符

3.5.1 catchError - 捕获错误并返回备用 Observable

import { of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';

throwError(() => new Error('Oops!')).pipe(
  catchError(() => of('fallback value'))
).subscribe(console.log); // 'fallback value'

3.5.2 retry - 重试

import { throwError, of } from 'rxjs';
import { retry, delay } from 'rxjs/operators';

let count = 0;
const source$ = of(null).pipe(
  delay(1000),
  map(() => {
    if (count++ < 2) throw new Error('Retry');
    return 'Success';
  })
);

source$.pipe(retry(2)).subscribe(console.log); // 'Success'

3.6 工具操作符

3.6.1 tap - 副作用操作

import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';

of(1, 2, 3).pipe(
  tap(x => console.log('Before:', x)),
  map(x => x * 2),
  tap(x => console.log('After:', x))
).subscribe();
// Before: 1, After: 2, Before: 2, After: 4, ...

3.6.2 delay - 延迟发出值

import { of } from 'rxjs';
import { delay } from 'rxjs/operators';

of(1, 2, 3).pipe(
  delay(1000)
).subscribe(console.log); // 1秒后输出 1, 2, 3

4. Subject 详解

4.1 BehaviorSubject

BehaviorSubject 会保存当前值,并在订阅时立即发出最新值:

import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject(0); // 初始值

subject.subscribe(v => console.log('A:', v)); // A: 0

subject.next(1); // A: 1

subject.subscribe(v => console.log('B:', v)); // B: 1

subject.next(2); // A: 2, B: 2

4.2 ReplaySubject

ReplaySubject 会重放指定数量的历史值给新订阅者:

import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject(2); // 重放最后2个值

subject.next(1);
subject.next(2);
subject.next(3);

subject.subscribe(v => console.log('A:', v)); // A: 2, A: 3

subject.next(4); // A: 4

4.3 AsyncSubject

AsyncSubject 只在源 Observable 完成时发出最后一个值:

import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject();

subject.subscribe(v => console.log('A:', v));

subject.next(1);
subject.next(2);
subject.complete(); // A: 2

5. 调度器 (Scheduler)

调度器控制 Observable 的执行时机:

import { of, asyncScheduler, queueScheduler, asapScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

// 异步执行
of(1, 2, 3).pipe(
  observeOn(asyncScheduler)
).subscribe(console.log);

// 微任务执行
of(4, 5, 6).pipe(
  observeOn(asapScheduler)
).subscribe(console.log);

console.log('Start');
// 输出顺序: Start, 4, 5, 6, 1, 2, 3

6. 实际应用场景

6.1 搜索输入防抖

import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, map } from 'rxjs/operators';

const searchInput = document.querySelector('input#search');

fromEvent(searchInput, 'input').pipe(
  map((e: Event) => (e.target as HTMLInputElement).value),
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(query => fetch(`/api/search?q=${query}`))
).subscribe(response => console.log(response));

6.2 无限滚动

import { fromEvent, Observable } from 'rxjs';
import { debounceTime, filter, switchMap, takeWhile } from 'rxjs/operators';

const scroll$ = fromEvent(window, 'scroll').pipe(
  debounceTime(100),
  map(() => {
    const scrollTop = window.scrollY;
    const windowHeight = window.innerHeight;
    const documentHeight = document.documentElement.scrollHeight;
    return documentHeight - scrollTop - windowHeight;
  }),
  filter(distance => distance < 200),
  switchMap(() => fetch('/api/next-page'))
);

scroll$.subscribe(data => {
  // 追加数据到列表
});

6.3 WebSocket 通信

import { webSocket } from 'rxjs/webSocket';

const socket$ = webSocket('ws://localhost:8080');

socket$.subscribe(
  message => console.log('Received:', message),
  error => console.error('Error:', error),
  () => console.log('Connection closed')
);

socket$.next({ type: 'subscribe', channel: 'updates' });

7. 最佳实践

7.1 取消订阅避免内存泄漏

import { interval, Subscription } from 'rxjs';

class MyComponent {
  private subscription: Subscription;

  ngOnInit() {
    this.subscription = interval(1000).subscribe(console.log);
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

7.2 使用 takeUntil 管理订阅生命周期

import { interval, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

class MyComponent {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    interval(1000).pipe(
      takeUntil(this.destroy$)
    ).subscribe(console.log);
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

7.3 使用操作符链而非嵌套订阅

// 避免
fromEvent(document, 'click').subscribe(() => {
  interval(1000).subscribe(console.log);
});

// 推荐
fromEvent(document, 'click').pipe(
  switchMap(() => interval(1000))
).subscribe(console.log);

7.4 正确处理错误

import { throwError, of } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';

fetchData().pipe(
  retry(2),
  catchError(error => {
    console.error('Failed after retries:', error);
    return of([]); // 返回默认值
  })
).subscribe(data => processData(data));

8. 常见误区

8.1 忘记取消订阅

问题:订阅后不取消会导致内存泄漏

解决方案:使用 takeUntiltakefirst 等操作符,或在组件销毁时手动取消订阅

8.2 错误使用 switchMap/mergeMap/concatMap

操作符

行为

适用场景

switchMap

取消前一个,切换到新的

搜索输入、按钮点击

mergeMap

并行执行所有

独立请求、批量操作

concatMap

顺序执行

有依赖的请求

8.3 忽略背压

问题:当生产者速度远快于消费者时,会导致内存溢出

解决方案:使用 throttleTimedebounceTimebufferCount 等操作符控制流速

8.4 过度使用 Subject

问题:Subject 破坏了声明式编程原则

解决方案:优先使用纯 Observable 和操作符,只在必要时使用 Subject


参考资源