0

0

Mutiny异步流处理:高效并发处理Uni中的元素

碧海醫心

碧海醫心

发布时间:2025-09-27 12:43:15

|

971人浏览过

|

来源于php中文网

原创

Mutiny异步流处理:高效并发处理Uni中的元素

本文深入探讨了如何在Mutiny框架中异步处理Uni中的每个元素。通过将Uni转换为Multi流,并利用onItem().transformToUniAndMerge()操作符,可以实现列表内元素的并发异步处理。文章提供了两种主要解决方案:结合Vert.x Unit进行非阻塞测试,以及使用collect().asList().await().indefinitely()进行阻塞式结果收集,并强调了相关注意事项和最佳实践。

1. 问题背景与常见误区

响应式编程中,我们经常会遇到需要处理包含多个元素的异步操作。例如,有一个uni>,我们希望对列表中的每个字符串都执行一个耗时的异步任务,并最终收集或处理所有任务的结果。

一个常见的尝试是使用map将List转换为List>,然后通过Uni.join().all(unis).andCollectFailures()来合并这些Uni。然而,这种方法可能无法达到预期的并发处理效果,或者在短生命周期的程序(如单元测试)中,由于主线程过早退出,导致异步任务未能完成就被终止,从而给人一种“只处理了第一个元素”的错觉。

问题的核心在于,Uni>本身代表的是一个单值流,其值是一个完整的列表。如果想对列表中的每个元素进行异步操作,并将其视为独立的响应式事件,就需要将这个列表“展开”成一个可以逐个处理的流。Mutiny提供了Multi类型来处理零到N个元素的流,这正是解决此类问题的关键。

2. Mutiny异步流处理核心:Uni与Multi

Mutiny是Quarkus等框架中广泛使用的响应式编程库,它提供了两种核心类型:

  • Uni: 代表一个异步操作,最终会发出0个或1个元素,或者一个失败事件。
  • Multi: 代表一个异步操作流,可以发出0到N个元素,或者一个失败事件,最终会发出完成事件。

要实现对Uni>中每个元素的异步并发处理,我们需要将Uni>首先转换为一个Multi,这样列表中的每个字符串就成为了Multi流中的一个独立事件。然后,我们可以对这个Multi流中的每个事件应用异步转换。

3. 解决方案一:在测试环境中优雅地处理异步流(结合Vert.x Unit)

在单元测试或需要非阻塞等待所有异步操作完成的场景中,我们可以利用Multi的特性和onTermination().invoke()回调来确保所有任务执行完毕。以下示例结合了Vert.x Unit,它提供了一个Async机制来管理异步测试的生命周期。

赣极购物商城网店建站软件系统
赣极购物商城网店建站软件系统

大小仅1兆左右 ,足够轻便的商城系统; 易部署,上传空间即可用,安全,稳定; 容易操作,登陆后台就可设置装饰网站; 并且使用异步技术处理网站数据,表现更具美感。 前台呈现页面,兼容主流浏览器,DIV+CSS页面设计; 如果您有一定的网页设计基础,还可以进行简易的样式修改,二次开发, 发布新样式,调整网站结构,只需修改css目录中的css.css文件即可。 商城网站完全独立,网站源码随时可供您下载

下载
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@ExtendWith(VertxExtension.class)
public class AsyncListProcessingTest {

    // 模拟一个异步操作,返回一个Uni
    private Uni processItemAsync(String item, Random random) {
        final int duration = (random.nextInt(5) + 1) * 1000; // 随机延迟1-5秒
        System.out.println("Starting process for: " + item + ", duration: " + duration + "ms");
        return Uni.createFrom().item(item)
                .onItem().delayIt().by(Duration.ofMillis(duration))
                .invoke(() -> System.out.println("Finished process for: " + item));
    }

    @Test
    public void testAsyncProcessingWithVertxUnit(VertxTestContext context) {
        Random random = new Random();
        // Vert.x Unit的Async对象,用于通知测试框架异步操作何时完成
        context.verify(() -> { // 确保在VertxTestContext的上下文中执行
            Uni.createFrom()
                    .item(List.of("a", "b", "c")) // 初始的Uni>
                    // 1. 将Uni>转换为Multi
                    .onItem().transformToMulti(Multi.createFrom()::iterable)
                    // 2. 对Multi中的每个元素应用异步转换,并将结果合并回Multi
                    .onItem().transformToUniAndMerge(s -> processItemAsync(s, random))
                    // 3. 订阅Multi流,处理每个完成的元素
                    .subscribe()
                    .with(
                            s -> System.out.println("Printing result: " + s), // 成功处理每个元素
                            context::failNow, // 任何错误导致流失败
                            context::completeNow // 流完成,通知VertxTestContext测试结束
                    );
        });
    }
}

代码解释:

  1. Uni.createFrom().item(List.of("a", "b", "c")): 创建一个包含字符串列表的初始Uni。
  2. .onItem().transformToMulti(Multi.createFrom()::iterable): 这是将Uni>转换为Multi的关键步骤。它将Uni发出的列表内容展开,使得列表中的每个元素都成为一个新的Multi事件。
  3. .onItem().transformToUniAndMerge(s -> processItemAsync(s, random)): 这是实现并发异步处理的核心。
    • transformToUni: 对Multi中的每个元素s,都会调用processItemAsync(s, random)方法,该方法返回一个Uni
    • andMerge: Mutiny会并发地订阅并执行这些由transformToUni创建的Uni。当任何一个Uni完成时,它的结果会被立即合并到输出的Multi流中。这意味着结果的顺序可能与原始列表的顺序不同,而是取决于哪个异步操作首先完成。
  4. .subscribe().with(...): 订阅最终的Multi流。
    • 第一个Lambda表达式处理Multi中发出的每个成功结果。
    • 第二个Lambda表达式处理流中的任何错误。
    • 第三个Lambda表达式在流成功完成时被调用,这里我们使用context::completeNow来通知Vert.x Unit测试已成功完成所有异步操作。

4. 解决方案二:阻塞式等待所有异步结果

在某些场景下,例如在命令行工具或需要等待所有异步操作完成后才能继续主程序执行时,我们可以选择阻塞当前线程直到所有结果都被收集。

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

import java.time.Duration;
import java.util.List;
import java.util.Random;

public class BlockingAsyncListProcessing {

    private static Uni processItemAsync(String item, Random random) {
        final int duration = (random.nextInt(5) + 1) * 1000; // 随机延迟1-5秒
        return Uni.createFrom().item(item)
                .onItem().delayIt().by(Duration.ofMillis(duration))
                .invoke(() -> System.out.println("Letter: " + item + ", duration in ms: " + duration));
    }

    public static void main(String[] args) {
        Random random = new Random();

        System.out.println("Starting blocking asynchronous processing...");

        List results = Uni.createFrom()
                .item(List.of("a", "b", "c")) // 初始的Uni>
                // 1. 将Uni>转换为Multi
                .onItem().transformToMulti(Multi.createFrom()::iterable)
                // 2. 对Multi中的每个元素应用异步转换,并将结果合并回Multi
                .onItem().transformToUniAndMerge(s -> processItemAsync(s, random))
                // 3. 可选:处理每个完成的元素
                .onItem().invoke(s -> System.out.println("Printing collected item: " + s))
                // 4. 将Multi中的所有元素收集到一个列表中
                .collect().asList()
                // 5. 阻塞当前线程,直到Uni>完成并返回结果
                .await().indefinitely();

        System.out.println("All items processed. Collected results: " + results);
    }
}

代码解释:

  1. 前两步与解决方案一相同:将Uni>转换为Multi,然后使用onItem().transformToUniAndMerge()并发处理每个元素。
  2. .collect().asList(): 这个操作符将Multi流中所有发出的元素收集到一个List中,并最终返回一个Uni>。这个Uni会在源Multi完成时发出包含所有收集元素的列表。
  3. .await().indefinitely(): 这是阻塞操作。它会阻塞当前线程,直到上游的Uni>发出其结果(即所有异步操作完成且结果被收集到列表中)。indefinitely()表示无限期等待。

5. 注意事项与最佳实践

  • 非阻塞优先: 尽可能采用非阻塞的响应式模式(如解决方案一)。await()操作会阻塞当前线程,在生产环境中应谨慎使用,尤其是在I/O密集型或Web应用中,它可能导致线程饥饿和性能问题。它更适合于启动代码、测试或需要同步等待所有异步任务完成的特定场景。
  • 错误处理: 在transformToUniAndMerge内部创建的Uni中,以及最终的subscribe().with()方法中,都应该有完善的错误处理逻辑。Mutiny提供了丰富的错误处理操作符,如onFailure().recoverWith()、onFailure().retry()等。
  • 并发度: transformToUniAndMerge会并发处理任务,但实际的并发度可能受限于底层线程池配置、系统资源以及具体异步操作的实现。如果需要精细控制并发度,可以考虑使用transformToUniAndMerge(concurrency, ...)变体。
  • 顺序保证: transformToUniAndMerge不保证结果的顺序与原始列表的顺序一致。如果需要保持顺序,可以考虑使用transformToUniAndConcatenate或在收集后手动排序。
  • 资源管理: 确保异步操作中使用的任何外部资源(如数据库连接、文件句柄)都能得到妥善管理和释放。

6. 总结

通过Mutiny的Multi类型和onItem().transformToUniAndMerge()操作符,我们可以有效地将Uni>中的每个元素转换为独立的异步任务并进行并发处理。根据应用场景的不同,我们可以选择非阻塞的订阅模式(适用于响应式系统和测试)或阻塞式的await()模式(适用于需要同步等待结果的特定场景)。理解并正确运用这些Mutiny操作符是构建高效、健壮的响应式应用程序的关键。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

401

2023.08.02

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

298

2023.08.03

js截取字符串的方法
js截取字符串的方法

js截取字符串的方法有substring()方法、substr()方法、slice()方法、split()方法和slice()方法。本专题为大家提供字符串相关的文章、下载、课程内容,供大家免费下载体验。

212

2023.09.04

java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1496

2023.10.24

字符串介绍
字符串介绍

字符串是一种数据类型,它可以是任何文本,包括字母、数字、符号等。字符串可以由不同的字符组成,例如空格、标点符号、数字等。在编程中,字符串通常用引号括起来,如单引号、双引号或反引号。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

622

2023.11.24

java读取文件转成字符串的方法
java读取文件转成字符串的方法

Java8引入了新的文件I/O API,使用java.nio.file.Files类读取文件内容更加方便。对于较旧版本的Java,可以使用java.io.FileReader和java.io.BufferedReader来读取文件。在这些方法中,你需要将文件路径替换为你的实际文件路径,并且可能需要处理可能的IOException异常。想了解更多java的相关内容,可以阅读本专题下面的文章。

572

2024.03.22

php中定义字符串的方式
php中定义字符串的方式

php中定义字符串的方式:单引号;双引号;heredoc语法等等。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

586

2024.04.29

go语言字符串相关教程
go语言字符串相关教程

本专题整合了go语言字符串相关教程,阅读专题下面的文章了解更多详细内容。

170

2025.07.29

c++ 根号
c++ 根号

本专题整合了c++根号相关教程,阅读专题下面的文章了解更多详细内容。

70

2026.01.23

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.9万人学习

C# 教程
C# 教程

共94课时 | 7.5万人学习

Java 教程
Java 教程

共578课时 | 50.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号