
本文详解如何在 Spring Data MongoDB Reactive 中动态暂停和恢复 Change Stream,利用 Disposable 控制订阅生命周期,并结合 resume token 实现断点续传,适用于数据库维护等场景。
本文详解如何在 spring data mongodb reactive 中动态暂停和恢复 change stream,利用 `disposable` 控制订阅生命周期,并结合 resume token 实现断点续传,适用于数据库维护等场景。
在响应式应用中,MongoDB 的 Change Stream 是实现实时数据同步的核心机制。但实际运维中(如集合重建、索引优化或批量迁移),常需临时中止监听、执行维护操作、再从断点精准恢复——而非简单重启流导致事件丢失或重复。Spring Data MongoDB Reactive 提供了基于 Project Reactor 的 Flux
✅ 暂停 Change Stream:取消订阅即可
Change Stream 本质是一个冷流(cold Flux),其执行依赖于下游订阅。调用 dispose() 即可立即终止当前订阅,释放资源并停止事件推送:
// 启动监听并持有 Disposable 引用
private volatile Disposable currentSubscription;
public void startWatching() {
if (currentSubscription == null || currentSubscription.isDisposed()) {
currentSubscription = reactiveMongoTemplate
.changeStream("collection",
ChangeStreamOptions.builder()
.returnFullDocumentOnUpdate()
.build(),
Example.class)
.filter(event -> event.getOperationType() != null)
.mapNotNull(ChangeStreamEvent::getBody)
.subscribe(
example -> exampleService.doSomething(example),
error -> log.error("Change stream error", error),
() -> log.info("Change stream completed")
);
}
}
public void stopWatching() {
if (currentSubscription != null && !currentSubscription.isDisposed()) {
currentSubscription.dispose(); // ✅ 立即停止流,无副作用
currentSubscription = null;
}
}⚠️ 注意:dispose() 是非阻塞、即时生效的操作,不会等待正在处理的事件完成,因此需确保 doSomething() 方法具备幂等性或事务一致性。
? 恢复 Change Stream:利用 Resume Token 实现断点续传
单纯重启流会从最新时间点开始,丢失暂停期间的变更。要真正“从断点恢复”,必须在暂停前保存 resumeToken,并在恢复时传入:
private volatile Bson resumeToken;
// 修改 watch() 方法,捕获并缓存 resumeToken
public Flux<ChangeStreamEvent<Example>> watchWithResumeSupport() {
return reactiveMongoTemplate.changeStream(
"collection",
ChangeStreamOptions.builder()
.returnFullDocumentOnUpdate()
.resumeAfter(resumeToken) // ? 关键:恢复时指定 token
.build(),
Example.class
).doOnNext(event -> {
// 持久化最新 resumeToken(建议存入 Redis 或本地内存,避免单点故障)
resumeToken = event.getResumeToken();
});
}
// 恢复监听(调用前确保 resumeToken 已设置)
public void resumeWatching() {
if (resumeToken == null) {
log.warn("No resume token available; starting from latest");
}
currentSubscription = watchWithResumeSupport()
.filter(event -> event.getOperationType() != null)
.mapNotNull(ChangeStreamEvent::getBody)
.subscribe(example -> exampleService.doSomething(example));
}? 最佳实践与注意事项
- Token 存储可靠性:resumeToken 应定期持久化(如每 5 秒写入 Redis),避免进程崩溃导致 token 丢失;生产环境推荐使用带 TTL 的分布式存储。
- 空 token 处理:首次启动或 token 无效时,resumeAfter(null) 等价于从最新时间开始,符合预期。
-
错误重试策略:网络中断等异常应触发自动重连 + token 恢复,可结合 retryWhen() 封装健壮流:
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)) .filter(throwable -> throwable instanceof MongoSocketReadException)) - 资源清理:dispose() 后务必置空引用,防止内存泄漏;建议配合 @PreDestroy 在 Bean 销毁时兜底清理。
通过 Disposable 精确控制生命周期 + resumeAfter() 保障语义连续性,你可以在不牺牲数据一致性的前提下,灵活调度 Change Stream,真正将变更监听纳入可控运维体系。










