1. 什么是 Rx.js
Rx.js (Reactive Extensions for JavaScript) 是一个基于观察者模式和迭代器模式的响应式编程库。它将异步事件流(如用户输入、HTTP 请求、WebSocket 消息等)抽象为 Observable,通过强大的操作符进行转换、过滤和组合。
1.1 响应式编程的核心思想
响应式编程是一种面向数据流和变化传播的编程范式。在 Rx.js 中:
数据流:所有事件都被视为可观察的序列
声明式编程:描述"什么"而非"如何"
异步处理:统一处理同步和异步操作
组合性:通过操作符组合复杂逻辑
1.2 Rx.js 的优势
2. 核心概念
2.1 Observable(可观察对象)
Observable 是 Rx.js 的核心,代表一个可观察的数据流。它可以发出三种类型的通知:
Next:发出一个值
Error:发出一个错误,终止流
Complete:正常完成,终止流
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next('Hello');
subscriber.next('World');
subscriber.complete();
});2.2 Observer(观察者)
Observer 是一个包含三个回调函数的对象,用于处理 Observable 发出的通知:
const observer = {
next: (value: T) => console.log('Value:', value),
error: (err: Error) => console.error('Error:', err),
complete: () => console.log('Completed')
};
// 订阅
observable.subscribe(observer);2.3 Subscription(订阅)
Subscription 代表 Observable 的执行。调用 subscribe() 会返回一个 Subscription 对象,用于取消订阅:
const subscription = observable.subscribe(observer);
// 取消订阅
subscription.unsubscribe();2.4 Operator(操作符)
操作符是用于处理 Observable 数据流的纯函数。Rx.js 提供了丰富的操作符,分为以下几类:
创建操作符:
of,from,interval,timer转换操作符:
map,flatMap,switchMap,concatMap过滤操作符:
filter,take,skip,debounceTime组合操作符:
merge,concat,combineLatest,zip错误处理操作符:
catchError,retry,retryWhen
2.5 Subject(主题)
Subject 既是 Observable 也是 Observer,它可以多路广播值给多个订阅者:
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log('Observer A:', v)
});
subject.subscribe({
next: (v) => console.log('Observer B:', v)
});
subject.next(1); // Observer A: 1, Observer B: 1
subject.next(2); // Observer A: 2, Observer B: 22.6 Scheduler(调度器)
Scheduler 控制 Observable 发出通知的时机和线程:
queue:同步执行asap:微任务队列async:宏任务队列(setTimeout)animationFrame:动画帧
3. 创建 Observable
3.1 创建操作符
3.1.1 of - 从多个值创建
import { of } from 'rxjs';
const source$ = of(1, 2, 3, 4, 5);
source$.subscribe(console.log); // 1, 2, 3, 4, 53.1.2 from - 从数组、Promise、可迭代对象创建
import { from } from 'rxjs';
// 从数组
from([1, 2, 3]).subscribe(console.log);
// 从 Promise
from(fetch('/api/data')).subscribe(response => console.log(response));3.1.3 interval - 创建定时器序列
import { interval } from 'rxjs';
const source$ = interval(1000); // 每秒发出一个递增数字
source$.subscribe(console.log); // 0, 1, 2, 3, ...3.1.4 timer - 延迟后发出值
import { timer } from 'rxjs';
// 2秒后发出0
timer(2000).subscribe(console.log);
// 1秒后开始,每2秒发出一个值
timer(1000, 2000).subscribe(console.log);3.1.5 fromEvent - 从 DOM 事件创建
import { fromEvent } from 'rxjs';
const button = document.querySelector('button');
const clicks$ = fromEvent(button, 'click');
clicks$.subscribe(() => console.log('Button clicked'));3.2 转换操作符
3.2.1 map - 转换每个值
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log); // 2, 4, 63.2.2 switchMap - 切换到新的 Observable
import { fromEvent, interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';
fromEvent(document, 'click').pipe(
switchMap(() => interval(1000))
).subscribe(console.log);
// 点击后开始每秒输出,再次点击重新开始3.2.3 concatMap - 顺序连接 Observable
import { of, interval } from 'rxjs';
import { concatMap, take } from 'rxjs/operators';
of(1, 2, 3).pipe(
concatMap(id => interval(1000).pipe(take(2)))
).subscribe(console.log);
// 0, 1, 0, 1, 0, 13.2.4 mergeMap - 并发合并 Observable
import { of, interval } from 'rxjs';
import { mergeMap, take } from 'rxjs/operators';
of(1, 2, 3).pipe(
mergeMap(id => interval(1000).pipe(take(2)))
).subscribe(console.log);
// 0, 0, 0, 1, 1, 13.3 过滤操作符
3.3.1 filter - 过滤值
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';
of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log); // 2, 43.3.2 take - 只取前 N 个值
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
interval(1000).pipe(
take(3)
).subscribe(console.log); // 0, 1, 23.3.3 debounceTime - 防抖
import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';
const input = document.querySelector('input');
fromEvent(input, 'input').pipe(
debounceTime(300),
map((e: Event) => (e.target as HTMLInputElement).value)
).subscribe(console.log);3.3.4 distinctUntilChanged - 去重连续重复值
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
of(1, 1, 2, 2, 3, 1).pipe(
distinctUntilChanged()
).subscribe(console.log); // 1, 2, 3, 13.4 组合操作符
3.4.1 combineLatest - 组合多个 Observable 的最新值
import { interval, combineLatest } from 'rxjs';
import { take } from 'rxjs/operators';
const source1$ = interval(1000).pipe(take(3));
const source2$ = interval(2000).pipe(take(2));
combineLatest([source1$, source2$]).subscribe(console.log);
// [0, 0], [1, 0], [2, 0], [2, 1]3.4.2 forkJoin - 等待所有 Observable 完成
import { forkJoin, of, interval } from 'rxjs';
import { take } from 'rxjs/operators';
forkJoin([
of(1, 2, 3),
interval(1000).pipe(take(2))
]).subscribe(console.log); // [3, 1]3.4.3 merge - 合并多个 Observable
import { merge, interval } from 'rxjs';
import { take } from 'rxjs/operators';
const source1$ = interval(500).pipe(take(3));
const source2$ = interval(1000).pipe(take(2));
merge(source1$, source2$).subscribe(console.log);
// 0, 0, 1, 1, 23.5 错误处理操作符
3.5.1 catchError - 捕获错误并返回备用 Observable
import { of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';
throwError(() => new Error('Oops!')).pipe(
catchError(() => of('fallback value'))
).subscribe(console.log); // 'fallback value'3.5.2 retry - 重试
import { throwError, of } from 'rxjs';
import { retry, delay } from 'rxjs/operators';
let count = 0;
const source$ = of(null).pipe(
delay(1000),
map(() => {
if (count++ < 2) throw new Error('Retry');
return 'Success';
})
);
source$.pipe(retry(2)).subscribe(console.log); // 'Success'3.6 工具操作符
3.6.1 tap - 副作用操作
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';
of(1, 2, 3).pipe(
tap(x => console.log('Before:', x)),
map(x => x * 2),
tap(x => console.log('After:', x))
).subscribe();
// Before: 1, After: 2, Before: 2, After: 4, ...3.6.2 delay - 延迟发出值
import { of } from 'rxjs';
import { delay } from 'rxjs/operators';
of(1, 2, 3).pipe(
delay(1000)
).subscribe(console.log); // 1秒后输出 1, 2, 34. Subject 详解
4.1 BehaviorSubject
BehaviorSubject 会保存当前值,并在订阅时立即发出最新值:
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 初始值
subject.subscribe(v => console.log('A:', v)); // A: 0
subject.next(1); // A: 1
subject.subscribe(v => console.log('B:', v)); // B: 1
subject.next(2); // A: 2, B: 24.2 ReplaySubject
ReplaySubject 会重放指定数量的历史值给新订阅者:
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(2); // 重放最后2个值
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe(v => console.log('A:', v)); // A: 2, A: 3
subject.next(4); // A: 44.3 AsyncSubject
AsyncSubject 只在源 Observable 完成时发出最后一个值:
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe(v => console.log('A:', v));
subject.next(1);
subject.next(2);
subject.complete(); // A: 25. 调度器 (Scheduler)
调度器控制 Observable 的执行时机:
import { of, asyncScheduler, queueScheduler, asapScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
// 异步执行
of(1, 2, 3).pipe(
observeOn(asyncScheduler)
).subscribe(console.log);
// 微任务执行
of(4, 5, 6).pipe(
observeOn(asapScheduler)
).subscribe(console.log);
console.log('Start');
// 输出顺序: Start, 4, 5, 6, 1, 2, 36. 实际应用场景
6.1 搜索输入防抖
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, map } from 'rxjs/operators';
const searchInput = document.querySelector('input#search');
fromEvent(searchInput, 'input').pipe(
map((e: Event) => (e.target as HTMLInputElement).value),
debounceTime(300),
distinctUntilChanged(),
switchMap(query => fetch(`/api/search?q=${query}`))
).subscribe(response => console.log(response));6.2 无限滚动
import { fromEvent, Observable } from 'rxjs';
import { debounceTime, filter, switchMap, takeWhile } from 'rxjs/operators';
const scroll$ = fromEvent(window, 'scroll').pipe(
debounceTime(100),
map(() => {
const scrollTop = window.scrollY;
const windowHeight = window.innerHeight;
const documentHeight = document.documentElement.scrollHeight;
return documentHeight - scrollTop - windowHeight;
}),
filter(distance => distance < 200),
switchMap(() => fetch('/api/next-page'))
);
scroll$.subscribe(data => {
// 追加数据到列表
});6.3 WebSocket 通信
import { webSocket } from 'rxjs/webSocket';
const socket$ = webSocket('ws://localhost:8080');
socket$.subscribe(
message => console.log('Received:', message),
error => console.error('Error:', error),
() => console.log('Connection closed')
);
socket$.next({ type: 'subscribe', channel: 'updates' });7. 最佳实践
7.1 取消订阅避免内存泄漏
import { interval, Subscription } from 'rxjs';
class MyComponent {
private subscription: Subscription;
ngOnInit() {
this.subscription = interval(1000).subscribe(console.log);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}7.2 使用 takeUntil 管理订阅生命周期
import { interval, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
class MyComponent {
private destroy$ = new Subject<void>();
ngOnInit() {
interval(1000).pipe(
takeUntil(this.destroy$)
).subscribe(console.log);
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}7.3 使用操作符链而非嵌套订阅
// 避免
fromEvent(document, 'click').subscribe(() => {
interval(1000).subscribe(console.log);
});
// 推荐
fromEvent(document, 'click').pipe(
switchMap(() => interval(1000))
).subscribe(console.log);7.4 正确处理错误
import { throwError, of } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';
fetchData().pipe(
retry(2),
catchError(error => {
console.error('Failed after retries:', error);
return of([]); // 返回默认值
})
).subscribe(data => processData(data));8. 常见误区
8.1 忘记取消订阅
问题:订阅后不取消会导致内存泄漏
解决方案:使用 takeUntil、take、first 等操作符,或在组件销毁时手动取消订阅
8.2 错误使用 switchMap/mergeMap/concatMap
8.3 忽略背压
问题:当生产者速度远快于消费者时,会导致内存溢出
解决方案:使用 throttleTime、debounceTime、bufferCount 等操作符控制流速
8.4 过度使用 Subject
问题:Subject 破坏了声明式编程原则
解决方案:优先使用纯 Observable 和操作符,只在必要时使用 Subject