0

0

Mutiny异步处理Uni中元素的最佳实践

心靈之曲

心靈之曲

发布时间:2025-09-27 12:48:01

|

170人浏览过

|

来源于php中文网

原创

mutiny异步处理uni中元素的最佳实践

响应式编程中,处理Uni>这类结构时,一个常见需求是将列表中的每个元素独立地进行异步操作。例如,从数据库批量查询得到一个ID列表,然后需要为每个ID调用一个外部服务。直接对Uni>进行map操作通常会将整个列表作为一个整体处理,而无法实现对列表内每个元素的并发异步处理。本文将深入探讨Mutiny提供的强大工具,帮助开发者优雅地实现这一目标,避免常见陷阱。

核心问题剖析

当面对一个Uni>并希望对列表中的每个T执行异步操作时,一个常见的误区是尝试直接通过map将List转换为List>,然后使用Uni.join().all(unis).andCollectFailures()来合并结果,最后通过subscribe()进行消费。这种方法在Mutiny的链式操作中是可行的,但如果后续没有适当的机制来保持主线程的活跃,例如在简单的main方法中,程序可能会在所有异步任务完成之前退出,导致部分或全部异步操作未能执行或其结果未被观察到。

要正确地将Uni>中的每个元素转换为一个独立的异步Uni并进行并发处理,我们需要利用Mutiny的流式处理能力,或者采用阻塞机制来等待所有操作完成。

方法一:利用Multi进行非阻塞流式处理

这种方法是Mutiny推荐的、更符合响应式编程范式的处理方式。它通过将包含列表的Uni转换为一个Multi流,然后对流中的每个元素进行异步转换和合并,实现并发处理。

原理介绍

Mutiny的Multi类型非常适合处理元素流。通过以下步骤,我们可以将Uni>转换为Multi,对流中的每个元素独立应用异步转换,并利用transformToUniAndMerge实现并发处理:

  1. Uni> 转换为 Multi: 使用onItem().transformToMulti(Multi.createFrom()::iterable)将包含列表的Uni转换为一个包含列表元素的Multi。
  2. 元素异步转换: 对每个Multi中的元素,使用onItem().transformToUniAndMerge(item -> Uni.createFrom().future(processFuture(item)))将其转换为一个代表异步操作的Uni。transformToUniAndMerge会自动处理这些Uni的并发执行和结果合并。
  3. 结果处理与流终止: transformToUniAndMerge会返回一个新的Multi,其元素是所有异步操作的结果。可以通过subscribe()消费这些结果。为了确保所有异步操作完成,特别是在非Web服务器等环境中,需要额外的机制来保持主线程运行。

代码示例

以下示例演示了如何使用线程池模拟异步操作,并结合Mutiny的Multi进行非阻塞流式处理。

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.CountDownLatch; // 用于在main方法中等待所有异步任务完成

public class AsyncListProcessor {

    private final ExecutorService executor = Executors.newFixedThreadPool(3); // 示例用线程池,限制并发数

    // 模拟一个返回Future的耗时操作
    private Future processFuture(String s) {
        return executor.submit(() -> {
            System.out.println("开始处理 (Future): " + s + " on thread " + Thread.currentThread().getName());
            try {
                Thread.sleep(new Random().nextInt(3000) + 1000); // 模拟1-4秒延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("处理中断", e);
            }
            System.out.println("结束处理 (Future): " + s + " on thread " + Thread.currentThread().getName());
            return s.toUpperCase(); // 假设处理后返回大写
        });
    }

    // 将Future封装成Uni
    private Uni processItemAsUni(String item) {
        return Uni.createFrom().future(processFuture(item));
    }

    public void processListReactive(List items, CountDownLatch latch) {
        System.out.println("\n--- 启动非阻塞流式处理 ---");
        Uni.createFrom()
            .item(items)
            // 将 Uni> 转换为 Multi
            .onItem().transformToMulti(Multi.createFrom()::iterable)
            // 对 Multi 中的每个元素进行异步处理,并合并结果
            .onItem().transformToUniAndMerge(this::processItemAsUni)
            // 订阅并打印每个完成的结果
            .subscribe()
            .with(
                s -> System.out.println("接收到结果 (Reactive): " + s),
                failure -> System.err.println("处理失败: " + failure.getMessage()),
                () -> {
                    System.out.println("所有非阻塞流式处理完成.");
                    latch.countDown(); // 通知主线程所有任务已完成
                }
            );
    }

    public static void main(String[] args) throws InterruptedException {
        AsyncListProcessor processor = new AsyncListProcessor();
        List data = List.of("apple", "banana", "cherry", "date", "elderberry");

        // 使用CountDownLatch等待所有异步任务完成
        CountDownLatch latch = new CountDownLatch(1);
        processor.processListReactive(data, latch);

        // 等待所有异步任务完成
        latch.await();
        System.out.println("主线程继续执行,所有异步任务已完成或失败。");
        processor.executor.shutdown(); // 关闭线程池
    }
}

注意事项

这种方式是非阻塞的,非常适合构建响应式应用程序。它允许任务并发执行,且不会阻塞主调用线程。在非Web服务器等环境中(如简单的main方法),为了确保程序不会在异步操作完成前退出,需要使用CountDownLatch、await()或其他同步机制来等待所有任务完成。在基于Mutiny的框架(如Quarkus)中,这些通常由框架的调度器和生命周期管理。

Figma
Figma

Figma 是一款基于云端的 UI 设计工具,可以在线进行产品原型、设计、评审、交付等工作。

下载

方法二:收集并等待所有结果 (阻塞式)

如果你的需求是等待所有异步操作完成,并将它们的结果收集到一个列表中,然后才能继续执行后续逻辑,那么可以使用阻塞式的方法。

原理介绍

这种方法同样利用Multi进行元素的异步转换,但在最后阶段,它会阻塞当前线程,直到所有异步操作完成并将结果聚合到一个列表中。

  1. Uni> 转换为 Multi: 同方法一。
  2. 元素异步转换: 同方法一,使用onItem().transformToUniAndMerge()。
  3. 收集并等待: 在transformToUniAndMerge返回的Multi上调用collect().asList()将其所有元素收集到一个Uni>中,然后使用await().indefinitely()阻塞当前线程,直到该Uni完成。

代码示例

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AsyncListProcessorBlocking {

    private final ExecutorService executor = Executors.newFixedThreadPool(3); // 示例用线程池

    // 模拟一个返回Future的耗时操作 (同上)
    private Future processFuture(String s) {
        return executor.submit(() -> {
            System.out.println("开始处理 (Future): " + s + " on thread " + Thread.currentThread().getName());
            try {
                Thread.sleep(new Random().nextInt(3000) + 1000); // 模拟1-4秒延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("处理中断", e);
            }
            System.out.println("结束处理 (Future): " + s + " on thread " + Thread.currentThread().getName());
            return s.toUpperCase();
        });
    }

    // 将Future封装成Uni
    private Uni processItemAsUni(String item) {
        return Uni.createFrom().future(processFuture(item));
    }

    public List processListBlocking(List items) {
        System.out.println("\n--- 启动阻塞式处理 ---");
        List results = Uni.createFrom()
            .item(items)
            .onItem().transformToMulti(Multi.createFrom()::iterable)
            .onItem().transformToUniAndMerge(this::processItemAsUni)
            .collect().asList() // 收集所有结果到一个 Uni>
            .await().indefinitely(); // 阻塞当前线程直到所有结果收集完毕

        System.out.println("--- 阻塞式处理完成 ---");
        return results;
    }

    public static void main(String[] args) {
        AsyncListProcessorBlocking processor = new AsyncListProcessorBlocking();
        List data = List.of("alpha", "beta", "gamma", "delta", "epsilon");

        List processedResults = processor.processListBlocking(data);
        System.out.println("所有处理结果 (阻塞式): " + processedResults);

        processor.executor.shutdown(); // 关闭线程池
    }
}

注意事项

await().indefinitely()会阻塞调用线程。虽然它能确保所有异步操作完成,但在响应式系统中应谨慎使用,因为它可能导致线程阻塞,降低系统的并发能力。它更适用于启动时的数据加载、测试场景或需要等待所有结果才能继续的特定批处理任务。在Web应用中,应避免在处理请求的线程中使用await(),以防阻塞请求处理。

总结与最佳实践

Mutiny提供了灵活且强大的机制来处理Uni>中的元素异步操作。选择哪种

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

502

2023.08.10

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

502

2023.08.10

golang map内存释放
golang map内存释放

本专题整合了golang map内存相关教程,阅读专题下面的文章了解更多相关内容。

75

2025.09.05

golang map相关教程
golang map相关教程

本专题整合了golang map相关教程,阅读专题下面的文章了解更多详细内容。

36

2025.11.16

golang map原理
golang map原理

本专题整合了golang map相关内容,阅读专题下面的文章了解更多详细内容。

60

2025.11.17

java判断map相关教程
java判断map相关教程

本专题整合了java判断map相关教程,阅读专题下面的文章了解更多详细内容。

40

2025.11.27

数据库三范式
数据库三范式

数据库三范式是一种设计规范,用于规范化关系型数据库中的数据结构,它通过消除冗余数据、提高数据库性能和数据一致性,提供了一种有效的数据库设计方法。本专题提供数据库三范式相关的文章、下载和课程。

356

2023.06.29

如何删除数据库
如何删除数据库

删除数据库是指在MySQL中完全移除一个数据库及其所包含的所有数据和结构,作用包括:1、释放存储空间;2、确保数据的安全性;3、提高数据库的整体性能,加速查询和操作的执行速度。尽管删除数据库具有一些好处,但在执行任何删除操作之前,务必谨慎操作,并备份重要的数据。删除数据库将永久性地删除所有相关数据和结构,无法回滚。

2078

2023.08.14

Python 自然语言处理(NLP)基础与实战
Python 自然语言处理(NLP)基础与实战

本专题系统讲解 Python 在自然语言处理(NLP)领域的基础方法与实战应用,涵盖文本预处理(分词、去停用词)、词性标注、命名实体识别、关键词提取、情感分析,以及常用 NLP 库(NLTK、spaCy)的核心用法。通过真实文本案例,帮助学习者掌握 使用 Python 进行文本分析与语言数据处理的完整流程,适用于内容分析、舆情监测与智能文本应用场景。

9

2026.01.27

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 4.2万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1.0万人学习

React核心原理新老生命周期精讲
React核心原理新老生命周期精讲

共12课时 | 1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号