0

0

如何在 MongoDB 中暂停并恢复 Change Stream

花韻仙語

花韻仙語

发布时间:2026-02-26 20:35:00

|

651人浏览过

|

来源于php中文网

原创

如何在 MongoDB 中暂停并恢复 Change Stream

本文详解如何通过 reactive streams 的订阅控制机制,安全暂停和恢复 mongodb 的 change stream,结合 resume token 实现断点续传,适用于数据库维护等场景。

本文详解如何通过 reactive streams 的订阅控制机制,安全暂停和恢复 mongodb 的 change stream,结合 resume token 实现断点续传,适用于数据库维护等场景。

MongoDB 的 Change Stream 是监听集合数据变更的实时通道,但在生产环境中,常需临时中止(如执行数据库维护、索引重建或版本迁移),并在之后从中断位置精确恢复,而非丢失事件或重复消费。Reactive MongoDB 驱动(如 Spring Data MongoDB Reactive)基于 Project Reactor 构建,其 Flux 天然支持背压与生命周期管理——关键在于主动控制订阅(Subscription),而非简单“关闭流”。

✅ 正确暂停:取消订阅(Dispose)

Change Stream 本身是冷流(cold Flux),只有被订阅后才启动监听。暂停的本质是终止当前订阅,释放资源:

// 启动监听并获取可处置的订阅引用
Disposable subscription = service.watch()
    .doOnNext(example -> exampleService.doSomething(example))
    .doOnError(error -> log.error("Change stream error", error))
    .doOnCancel(() -> log.info("Change stream cancelled"))
    .subscribe();

// 暂停:立即终止当前流,关闭底层 cursor,不触发新事件
subscription.dispose();

⚠️ 注意:dispose() 是即时且不可逆的——它会关闭当前 cursor,但不会自动保存 resume token。若需后续恢复,必须在取消前显式提取并持久化 token。

Replit Agent
Replit Agent

Replit最新推出的AI编程工具,可以帮助用户从零开始自动构建应用程序。

下载

✅ 安全恢复:携带 resumeToken 重启流

MongoDB Change Stream 支持通过 resumeAfter 或 startAfter 选项从指定 token 继续。需在暂停前捕获最新 token,并在恢复时注入:

// 在 watch() 中增强:暴露并缓存最新 resumeToken
private final AtomicReference<BsonDocument> latestResumeToken = new AtomicReference<>();

public Flux<Example> watch() {
    ChangeStreamOptions options = ChangeStreamOptions.builder()
        .returnFullDocumentOnUpdate()
        .build();

    return reactiveMongoTemplate.changeStream("collection", options, Example.class)
        .doOnNext(event -> {
            // 关键:每次收到事件后更新最新 token(非 null 才有效)
            if (event.getResumeToken() != null) {
                latestResumeToken.set(event.getResumeToken());
            }
        })
        .filter(e -> e.getOperationType() != null)
        .mapNotNull(ChangeStreamEvent::getBody);
}

// 恢复方法:使用上次保存的 token 创建新流
public Flux<Example> resumeFromLastToken() {
    BsonDocument token = latestResumeToken.get();
    if (token == null) {
        throw new IllegalStateException("No valid resume token available. Start fresh or handle initial sync.");
    }

    ChangeStreamOptions options = ChangeStreamOptions.builder()
        .resumeAfter(token) // ← 核心:从该 token 后续事件开始
        .returnFullDocumentOnUpdate()
        .build();

    return reactiveMongoTemplate.changeStream("collection", options, Example.class)
        .filter(e -> e.getOperationType() != null)
        .mapNotNull(ChangeStreamEvent::getBody);
}

? 关键注意事项

  • Token 有效性:resumeAfter 要求 token 在 oplog 时间窗口内有效(默认 24 小时)。长时间停机需确认 oplog 容量是否足够。
  • 幂等性设计:即使因网络抖动导致重复事件(如 resume token 对应事件已处理),业务逻辑应具备幂等处理能力。
  • 状态持久化:生产环境建议将 latestResumeToken 存入 Redis 或数据库,避免服务重启后 token 丢失。
  • 错误重试策略:在 Flux 链中添加 .retryBackoff() 可应对临时连接失败,但需配合 token 管理,避免跳过事件。

✅ 总结

暂停 Change Stream ≠ “暂停线程”,而是取消 Reactor 订阅以释放资源;恢复 ≠ 重启流,而是用上次成功的 resume token 构造新流。整个过程依赖对 Disposable 生命周期的精准控制与对 MongoDB 底层变更日志机制的理解。合理封装 token 管理逻辑后,即可构建高可用、可运维的实时数据同步服务。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
spring框架介绍
spring框架介绍

本专题整合了spring框架相关内容,想了解更多详细内容,请阅读专题下面的文章。

145

2025.08.06

Java Spring Security 与认证授权
Java Spring Security 与认证授权

本专题系统讲解 Java Spring Security 框架在认证与授权中的应用,涵盖用户身份验证、权限控制、JWT与OAuth2实现、跨站请求伪造(CSRF)防护、会话管理与安全漏洞防范。通过实际项目案例,帮助学习者掌握如何 使用 Spring Security 实现高安全性认证与授权机制,提升 Web 应用的安全性与用户数据保护。

82

2026.01.26

登录token无效
登录token无效

登录token无效解决方法:1、检查token的有效期限,如果token已经过期,需要重新获取一个新的token;2、检查token的签名,如果签名不正确,需要重新获取一个新的token;3、检查密钥的正确性,如果密钥不正确,需要重新获取一个新的token;4、使用HTTPS协议传输token,建议使用HTTPS协议进行传输 ;5、使用双因素认证,双因素认证可以提高账户的安全性。

6461

2023.09.14

登录token无效怎么办
登录token无效怎么办

登录token无效的解决办法有检查Token是否过期、检查Token是否正确、检查Token是否被篡改、检查Token是否与用户匹配、清除缓存或Cookie、检查网络连接和服务器状态、重新登录或请求新的Token、联系技术支持或开发人员等。本专题为大家提供token相关的文章、下载、课程内容,供大家免费下载体验。

838

2023.09.14

token怎么获取
token怎么获取

获取token值的方法:1、小程序调用“wx.login()”获取 临时登录凭证code,并回传到开发者服务器;2、开发者服务器以code换取,用户唯一标识openid和会话密钥“session_key”。想了解更详细的内容,可以阅读本专题下面的文章。

1087

2023.12.21

token什么意思
token什么意思

token是一种用于表示用户权限、记录交易信息、支付虚拟货币的数字货币。可以用来在特定的网络上进行交易,用来购买或出售特定的虚拟货币,也可以用来支付特定的服务费用。想了解更多token什么意思的相关内容可以访问本专题下面的文章。

1728

2024.03.01

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

721

2023.08.10

常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

999

2023.11.02

Golang 实际项目案例:从需求到上线
Golang 实际项目案例:从需求到上线

《Golang 实际项目案例:从需求到上线》以真实业务场景为主线,完整覆盖需求分析、架构设计、模块拆分、编码实现、性能优化与部署上线全过程,强调工程规范与实践决策,帮助开发者打通从技术实现到系统交付的关键路径,提升独立完成 Go 项目的综合能力。

1

2026.02.26

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 5.6万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号