
本文旨在详细阐述在project reactor框架中,如何优雅且非阻塞地将两个独立的flux流处理后的结果聚合为一个单一的mono对象。通过分析传统阻塞式操作的弊端,我们将重点介绍并演示mono.zipwith操作符的正确使用方法,以实现高效、响应式的并发数据聚合,从而避免在异步流程中引入阻塞点。
在响应式编程中,我们经常需要从多个独立的异步源获取数据,并将这些数据组合成一个统一的结果对象。例如,一个支付服务可能需要同时从不同的子系统获取成功交易列表和失败交易列表,然后将它们封装在一个Payments对象中返回。
考虑以下领域模型:
package org.example;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import java.util.List;
@Getter
@Builder
@ToString
public class Payments {
private List<SuccessAccount> successAccounts;
private List<FailedAccount> failedAccounts;
@Getter
@Builder
@ToString
public static class SuccessAccount {
private String name;
private String accountNumber;
}
@Getter
@Builder
@ToString
public static class FailedAccount {
private String name;
private String accountNumber;
private String errorCode;
}
}假设我们有两个方法分别返回成功账户和失败账户的Flux流:
public static Flux<Payments.SuccessAccount> getAccountsSucceeded() {
return Flux.just(Payments.SuccessAccount.builder()
.accountNumber("1234345")
.name("Payee1")
.build(),
Payments.SuccessAccount.builder()
.accountNumber("83673674")
.name("Payee2")
.build());
}
public static Flux<Payments.FailedAccount> getAccountsFailed() {
return Flux.just(Payments.FailedAccount.builder()
.accountNumber("12234345")
.name("Payee3")
.errorCode("8938")
.build(),
Payments.FailedAccount.builder()
.accountNumber("3342343")
.name("Payee4")
.errorCode("8938")
.build());
}一个常见的误区是尝试通过订阅这些Flux流并将结果收集到可变列表中,然后构建最终对象。例如:
// 这是一个阻塞的、不推荐的做法
public static Mono<Payments> getPaymentDataBlocking() {
Flux<Payments.SuccessAccount> accountsSucceeded = getAccountsSucceeded();
Flux<Payments.FailedAccount> accountsFailed = getAccountsFailed();
List<Payments.SuccessAccount> successAccounts = new ArrayList<>();
List<Payments.FailedAccount> failedAccounts = new ArrayList<>();
// 调用 subscribe() 会立即触发流的执行,并在当前线程等待结果,导致阻塞
accountsFailed.collectList().subscribe(failedAccounts::addAll);
accountsSucceeded.collectList().subscribe(successAccounts::addAll);
return Mono.just(Payments.builder()
.failedAccounts(failedAccounts)
.successAccounts(successAccounts)
.build());
}上述代码中的subscribe()调用是阻塞的,因为它会在当前线程等待collectList()操作完成,这违背了Reactor非阻塞的原则。在实际的Web服务或异步处理场景中,这种阻塞操作会导致线程池资源耗尽,严重影响系统吞吐量和响应性。
为了在Reactor中实现真正的非阻塞聚合,我们需要利用其提供的组合操作符。Mono.zipWith(或Mono.zip)是解决此类问题的理想选择。它允许我们将两个Mono(或多个Mono)的结果组合起来,一旦所有源Mono都完成了并产生了它们的值,就会使用一个提供的BiFunction(或Function)来处理这些值,并生成一个新的Mono结果。
具体步骤如下:
下面是使用Mono.zipWith实现的非阻塞解决方案:
package org.example;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] args) {
// 订阅并打印结果,这是在应用程序入口点进行的操作,不会阻塞核心业务逻辑
getPaymentData().subscribe(System.out::println);
// 为了在main方法中观察异步结果,通常需要一些延迟或等待机制
// 在实际应用中,例如Spring WebFlux控制器,Mono会被框架自动订阅和处理
try {
Thread.sleep(1000); // 简单等待,仅用于演示
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static Mono<Payments> getPaymentData() {
Flux<Payments.SuccessAccount> accountsSucceededFlux = getAccountsSucceeded();
Flux<Payments.FailedAccount> accountsFailedFlux = getAccountsFailed();
// 将Flux转换为Mono<List>
Mono<List<Payments.SuccessAccount>> successAccountsMono = accountsSucceededFlux.collectList();
Mono<List<Payments.FailedAccount>> failedAccountsMono = accountsFailedFlux.collectList();
// 使用 zipWith 组合两个 Mono 的结果
Mono<Payments> combinedPaymentsMono = failedAccountsMono.zipWith(
successAccountsMono,
(failedAccounts, successAccounts) -> Payments.builder()
.failedAccounts(failedAccounts)
.successAccounts(successAccounts)
.build()
);
return combinedPaymentsMono;
}
public static Flux<Payments.SuccessAccount> getAccountsSucceeded() {
return Flux.just(Payments.SuccessAccount.builder()
.accountNumber("1234345")
.name("Payee1")
.build(),
Payments.SuccessAccount.builder()
.accountNumber("83673674")
.name("Payee2")
.build());
}
public static Flux<Payments.FailedAccount> getAccountsFailed() {
return Flux.just(Payments.FailedAccount.builder()
.accountNumber("12234345")
.name("Payee3")
.errorCode("8938")
.build(),
Payments.FailedAccount.builder()
.accountNumber("3342343")
.name("Payee4")
.errorCode("8938")
.build());
}
}在这个改进后的getPaymentData()方法中:
通过Mono.zipWith操作符,我们能够优雅且高效地在Project Reactor中聚合来自多个Flux流的异步结果,并将其封装成一个单一的Mono对象。这种模式是构建高性能、非阻塞响应式应用程序的关键,它确保了在处理并发数据源时,应用程序能够充分利用资源并保持出色的响应能力。理解并正确运用这些组合操作符,是掌握Reactor响应式编程范式的核心。
以上就是Reactor响应式编程:非阻塞地聚合两个Flux流的结果为单个Mono对象的详细内容,更多请关注php中文网其它相关文章!
编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号