0

0

响应式编程中 Reactor Mono/Flux 实现轮询请求的策略与实践

碧海醫心

碧海醫心

发布时间:2025-11-09 15:06:01

|

1016人浏览过

|

来源于php中文网

原创

响应式编程中 reactor mono/flux 实现轮询请求的策略与实践

本教程探讨了在 Reactor 响应式编程中实现外部系统状态轮询的两种主要策略。首先介绍基于 `Mono` 的 `retryWhen` 机制,适用于在错误发生时重试。随后深入讲解利用 `Flux.interval` 实现固定间隔轮询的方法,该方法在控制请求频率、并发性及避免异常流控制方面具有优势,并提供了详细的代码示例和选择考量。

引言:响应式轮询的需求

在构建现代分布式系统时,我们经常需要与外部服务交互,并等待其状态从“处理中”变为“就绪”。这种持续检查外部状态直到满足特定条件的行为,通常通过轮询(Polling)机制实现。在基于 Project Reactor 的响应式编程范式中,实现高效、健壮且资源友好的轮询策略是关键。本文将深入探讨两种主要的 Reactor 轮询实现方法:基于 retryWhen 的错误重试机制和基于 Flux.interval 的固定间隔轮询策略。

基于 retryWhen 的轮询实现

最初,开发者可能会倾向于使用 Reactor 的 retryWhen 操作符来处理轮询场景。这种方法的核心思想是:当外部系统状态不满足条件时,通过抛出特定异常来触发重试机制,从而实现周期性的状态检查。

实现原理

该方法通常结合 WebClient 发起请求,并通过 filter 操作符检查返回的状态。如果状态不符合预期(例如,系统尚未就绪),则使用 switchIfEmpty 结合 Mono.error() 抛出一个自定义异常。随后,retryWhen 操作符捕获此异常,并根据配置的重试策略(如固定延迟)重新订阅上游流,再次发起请求。

以下是一个典型的实现示例:

Magic AI Avatars
Magic AI Avatars

神奇的AI头像,获得200多个由AI制作的自定义头像。

下载
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.Duration;

// 假设 Status 是一个包含 isReady() 方法的枚举或类
// 假设 SystemStateNotReadyException 是一个自定义异常

public class PollingWithRetryWhen {

    private final WebClient webClient;
    private final int MAX_ATTEMPT = 5; // 最大重试次数
    private final Duration BACK_OFF = Duration.ofSeconds(1); // 重试间隔

    public PollingWithRetryWhen(WebClient webClient) {
        this.webClient = webClient;
    }

    /**
     * 模拟从外部系统获取状态。
     * 实际应用中会通过 webClient 调用外部API。
     */
    private Mono<Status> checkStatus() {
        return webClient.get()
                        .uri("/api/status")
                        .retrieve()
                        .bodyToMono(String.class)
                        .map(response -> Status.from(response)); // 假设 Status.from(String) 能够解析响应
    }

    /**
     * 轮询外部系统直到其状态变为就绪。
     * 如果未就绪,则抛出异常并重试。
     */
    public Mono<Status> pollUntilReady() {
        return checkStatus()
                .filter(Status::isReady) // 过滤出已就绪的状态
                .switchIfEmpty(
                    Mono.error(new SystemStateNotReadyException("System is not ready yet.")) // 如果未就绪,则抛出异常
                )
                .retryWhen(
                    Retry.fixedDelay(MAX_ATTEMPT, BACK_OFF) // 固定延迟重试
                         .filter(err -> err instanceof SystemStateNotReadyException) // 只对特定异常进行重试
                );
    }
}

// 示例 Status 类,代表外部系统状态
class Status {
    private final boolean ready;

    public Status(boolean ready) {
        this.ready = ready;
    }

    public boolean isReady() {
        return ready;
    }

    public static Status from(String response) {
        // 模拟解析逻辑,例如根据响应字符串判断是否就绪
        return new Status(response.contains("READY"));
    }
}

// 示例自定义异常,用于在系统未就绪时触发重试
class SystemStateNotReadyException extends RuntimeException {
    public SystemStateNotReadyException(String message) {
        super(message);
    }
}

优点与考量

  • 简洁性: 对于简单的重试逻辑,retryWhen 提供了一种声明式且易于理解的方式。
  • 错误驱动: 这种方法天然地将轮询与错误处理相结合,只有在状态不满足条件(被视为一种“错误”)时才触发重试。
  • 线程安全与内存: Reactor 框架本身在设计时就考虑了线程安全和资源管理。上述代码片段在典型使用场景下通常是线程安全的,并且不会导致内存泄漏,因为 Reactor 的操作符会妥善管理订阅和资源。

然而,retryWhen 的核心是“重试”,这意味着它通常在操作失败(抛出异常)后才触发。如果每次轮询请求本身是成功的(即返回了状态,只是状态值不符合预期),但我们仍想以固定间隔进行检查,那么通过抛出异常来控制流程可能不是最优雅或最高效的方式。

基于 Flux.interval 的高级轮询策略

当需要以精确的固定间隔进行轮询,并且希望更精细地控制轮询过程(例如,独立于每次请求的响应时间),或者避免使用异常来控制正常流程时,Flux.interval 提供了一个更强大的替代方案。

实现原理

Flux.interval 会周期性地发出递增的 Long 值,每个值代表一个时间间隔。我们可以将这些周期性信号映射(concatMap 或 flatMap)到我们的状态检查请求中。通过这种方式,即使某个状态检查请求耗时较长,下一个请求也会在预设的固定间隔后触发,而不是等待上一个请求完全完成后再计算延迟。

优点

  • 固定间隔: 确保请求以固定的时间间隔发送,独立于单个请求的响应时间。
  • 计数器: Flux.interval 发出的 Long 值可以作为轮询尝试的计数器。
  • 并发控制: 可以选择 concatMap(顺序执行,等待前一个完成)或 flatMap(并发执行,不等待前一个完成)来控制状态检查请求的并发行为。
  • 避免异常流控制: 不需要通过抛出和捕获异常来控制轮询流程,这在某些情况下可能带来性能优势。

示例代码

以下是一个使用 Flux.interval 实现固定间隔轮询的示例:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.Duration;
import java.util.concurrent.TimeoutException; // 用于可能的超时异常

public class PollingWithFluxInterval {

    private final WebClient webClient;
    private final int MAX_ATTEMPTS = 10; // 最大轮询尝试次数
    private final Duration INTERVAL = Duration.ofMillis(100); // 每100毫秒发送一次请求

    public PollingWithFluxInterval(WebClient webClient) {
        this.webClient = webClient;
    }

    /**
     * 模拟从外部系统获取状态,返回一个包含尝试次数和状态就绪情况的报告。
     * 模拟了每次请求的网络延迟。
     */
    private Mono<Report> fetchStatus(long count) {
        // 模拟网络请求和处理时间,例如50ms
        return webClient.get()
                        .uri("/api/status")

相关文章

编程速学教程(入门课程)
编程速学教程(入门课程)

编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

407

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

251

2023.10.07

scripterror怎么解决
scripterror怎么解决

scripterror的解决办法有检查语法、文件路径、检查网络连接、浏览器兼容性、使用try-catch语句、使用开发者工具进行调试、更新浏览器和JavaScript库或寻求专业帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

492

2023.10.18

500error怎么解决
500error怎么解决

500error的解决办法有检查服务器日志、检查代码、检查服务器配置、更新软件版本、重新启动服务、调试代码和寻求帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

382

2023.10.25

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

765

2023.08.10

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

69

2026.03.11

Go高并发任务调度与Goroutine池化实践
Go高并发任务调度与Goroutine池化实践

本专题围绕 Go 语言在高并发任务处理场景中的实践展开,系统讲解 Goroutine 调度模型、Channel 通信机制以及并发控制策略。内容包括任务队列设计、Goroutine 池化管理、资源限制控制以及并发任务的性能优化方法。通过实际案例演示,帮助开发者构建稳定高效的 Go 并发任务处理系统,提高系统在高负载环境下的处理能力与稳定性。

37

2026.03.10

Kotlin Android模块化架构与组件化开发实践
Kotlin Android模块化架构与组件化开发实践

本专题围绕 Kotlin 在 Android 应用开发中的架构实践展开,重点讲解模块化设计与组件化开发的实现思路。内容包括项目模块拆分策略、公共组件封装、依赖管理优化、路由通信机制以及大型项目的工程化管理方法。通过真实项目案例分析,帮助开发者构建结构清晰、易扩展且维护成本低的 Android 应用架构体系,提升团队协作效率与项目迭代速度。

82

2026.03.09

JavaScript浏览器渲染机制与前端性能优化实践
JavaScript浏览器渲染机制与前端性能优化实践

本专题围绕 JavaScript 在浏览器中的执行与渲染机制展开,系统讲解 DOM 构建、CSSOM 解析、重排与重绘原理,以及关键渲染路径优化方法。内容涵盖事件循环机制、异步任务调度、资源加载优化、代码拆分与懒加载等性能优化策略。通过真实前端项目案例,帮助开发者理解浏览器底层工作原理,并掌握提升网页加载速度与交互体验的实用技巧。

97

2026.03.06

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 6万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1万人学习

React核心原理新老生命周期精讲
React核心原理新老生命周期精讲

共12课时 | 1.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号