
本文详解如何通过 reactive streams 的订阅控制机制,安全暂停和恢复 mongodb 的 change stream,结合 resume token 实现断点续传,适用于数据库维护等场景。
本文详解如何通过 reactive streams 的订阅控制机制,安全暂停和恢复 mongodb 的 change stream,结合 resume token 实现断点续传,适用于数据库维护等场景。
MongoDB 的 Change Stream 是监听集合数据变更的实时通道,但在生产环境中,常需临时中止(如执行数据库维护、索引重建或版本迁移),并在之后从中断位置精确恢复,而非丢失事件或重复消费。Reactive MongoDB 驱动(如 Spring Data MongoDB Reactive)基于 Project Reactor 构建,其 Flux 天然支持背压与生命周期管理——关键在于主动控制订阅(Subscription),而非简单“关闭流”。
✅ 正确暂停:取消订阅(Dispose)
Change Stream 本身是冷流(cold Flux),只有被订阅后才启动监听。暂停的本质是终止当前订阅,释放资源:
// 启动监听并获取可处置的订阅引用
Disposable subscription = service.watch()
.doOnNext(example -> exampleService.doSomething(example))
.doOnError(error -> log.error("Change stream error", error))
.doOnCancel(() -> log.info("Change stream cancelled"))
.subscribe();
// 暂停:立即终止当前流,关闭底层 cursor,不触发新事件
subscription.dispose();⚠️ 注意:dispose() 是即时且不可逆的——它会关闭当前 cursor,但不会自动保存 resume token。若需后续恢复,必须在取消前显式提取并持久化 token。
✅ 安全恢复:携带 resumeToken 重启流
MongoDB Change Stream 支持通过 resumeAfter 或 startAfter 选项从指定 token 继续。需在暂停前捕获最新 token,并在恢复时注入:
// 在 watch() 中增强:暴露并缓存最新 resumeToken
private final AtomicReference<BsonDocument> latestResumeToken = new AtomicReference<>();
public Flux<Example> watch() {
ChangeStreamOptions options = ChangeStreamOptions.builder()
.returnFullDocumentOnUpdate()
.build();
return reactiveMongoTemplate.changeStream("collection", options, Example.class)
.doOnNext(event -> {
// 关键:每次收到事件后更新最新 token(非 null 才有效)
if (event.getResumeToken() != null) {
latestResumeToken.set(event.getResumeToken());
}
})
.filter(e -> e.getOperationType() != null)
.mapNotNull(ChangeStreamEvent::getBody);
}
// 恢复方法:使用上次保存的 token 创建新流
public Flux<Example> resumeFromLastToken() {
BsonDocument token = latestResumeToken.get();
if (token == null) {
throw new IllegalStateException("No valid resume token available. Start fresh or handle initial sync.");
}
ChangeStreamOptions options = ChangeStreamOptions.builder()
.resumeAfter(token) // ← 核心:从该 token 后续事件开始
.returnFullDocumentOnUpdate()
.build();
return reactiveMongoTemplate.changeStream("collection", options, Example.class)
.filter(e -> e.getOperationType() != null)
.mapNotNull(ChangeStreamEvent::getBody);
}? 关键注意事项
- Token 有效性:resumeAfter 要求 token 在 oplog 时间窗口内有效(默认 24 小时)。长时间停机需确认 oplog 容量是否足够。
- 幂等性设计:即使因网络抖动导致重复事件(如 resume token 对应事件已处理),业务逻辑应具备幂等处理能力。
- 状态持久化:生产环境建议将 latestResumeToken 存入 Redis 或数据库,避免服务重启后 token 丢失。
- 错误重试策略:在 Flux 链中添加 .retryBackoff() 可应对临时连接失败,但需配合 token 管理,避免跳过事件。
✅ 总结
暂停 Change Stream ≠ “暂停线程”,而是取消 Reactor 订阅以释放资源;恢复 ≠ 重启流,而是用上次成功的 resume token 构造新流。整个过程依赖对 Disposable 生命周期的精准控制与对 MongoDB 底层变更日志机制的理解。合理封装 token 管理逻辑后,即可构建高可用、可运维的实时数据同步服务。










