0

0

Spring EmbeddedKafka 生产者等待消费者确认的实现方法

花韻仙語

花韻仙語

发布时间:2025-10-12 11:00:05

|

785人浏览过

|

来源于php中文网

原创

spring embeddedkafka 生产者等待消费者确认的实现方法

本文介绍了在使用 Spring EmbeddedKafka 进行集成测试时,如何实现生产者等待消费者确认消息已被处理的机制。由于 Kafka 的生产者和消费者是独立的,`acks` 参数仅保证 Broker 收到并持久化消息,并不能确保消费者成功消费。因此,我们需要自定义逻辑来实现生产者对消费者确认的等待。

在使用 Spring EmbeddedKafka 进行集成测试时,一个常见的需求是确保生产者发送的消息被消费者成功处理后再进行后续操作。然而,Kafka 的设计是生产者和消费者完全解耦的,生产者无法直接感知消费者是否已经消费了消息。即使配置了 acks=all,也只能保证消息被 Broker 成功接收并持久化,而不能保证消费者已经调用 Acknowledgement.acknowledge() 方法确认消费。

理解 Kafka 的 acks 参数

Kafka 的 acks 参数控制着生产者发送消息后,需要多少个 Broker 确认接收消息才算成功。它有以下几种取值:

  • acks=0: 生产者发送消息后,不会等待任何 Broker 的确认。这种模式下吞吐量最高,但可靠性最低,消息可能丢失。
  • acks=1: 生产者发送消息后,等待 Leader Broker 确认接收消息。这种模式下吞吐量和可靠性之间取得平衡。
  • acks=all 或 acks=-1: 生产者发送消息后,等待所有 ISR (In-Sync Replicas) Broker 确认接收消息。这种模式下可靠性最高,但吞吐量最低。

需要注意的是,acks 参数只与 Broker 的确认机制有关,与消费者是否消费消息无关。

实现生产者等待消费者确认的方案

由于 Kafka 本身不提供生产者等待消费者确认的机制,我们需要自定义逻辑来实现。以下是一些可能的方案:

  1. 使用共享状态: 可以通过共享内存、数据库、Redis 等方式,让消费者在消费消息后更新一个状态,生产者轮询检查该状态,直到状态变为已消费。

    • 优点: 实现简单。
    • 缺点: 需要引入额外的组件,轮询检查会消耗资源,可能存在延迟。
  2. 使用 Kafka Topic 作为确认通道: 消费者在成功消费消息后,向特定的 Kafka Topic 发送一条确认消息,生产者监听该 Topic,收到确认消息后认为消息已被消费。

    LongCat AI
    LongCat AI

    美团推出的AI对话问答工具

    下载
    • 优点: 利用 Kafka 的消息机制,可靠性较高。
    • 缺点: 需要配置额外的 Topic,增加了复杂度。
  3. 使用回调函数: 生产者发送消息时,传递一个回调函数给消费者,消费者在成功消费消息后执行该回调函数。

    • 优点: 实现简单,不需要额外的组件。
    • 缺点: 回调函数的执行可能会阻塞消费者线程,需要谨慎处理。

示例代码 (使用 Kafka Topic 作为确认通道)

以下是一个使用 Kafka Topic 作为确认通道的示例代码:

生产者:

@Autowired
private KafkaTemplate kafkaTemplate;

private final String confirmationTopic = "confirmation-topic";

public void sendMessageAndWaitForConfirmation(String topic, String message, String correlationId) throws ExecutionException, InterruptedException, TimeoutException {
    // 1. 发送消息到目标 Topic
    kafkaTemplate.send(topic, message);

    // 2. 监听确认 Topic,等待确认消息
    MessageListenerContainer container = createConfirmationListenerContainer(correlationId);
    container.start();

    // 3. 设置超时时间,防止无限等待
    try {
        latch.await(10, TimeUnit.SECONDS); // 假设超时时间为 10 秒
    } finally {
        container.stop(); // 停止监听器
    }

    if (latch.getCount() > 0) {
        throw new TimeoutException("Timeout waiting for confirmation message.");
    }
}

private MessageListenerContainer createConfirmationListenerContainer(String correlationId) {
    ContainerProperties containerProps = new ContainerProperties(confirmationTopic);
    CountDownLatch latch = new CountDownLatch(1);
    containerProps.setMessageListener((MessageListener) record -> {
        if (record.value().equals(correlationId)) {
            latch.countDown();
        }
    });

    ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

消费者:

@KafkaListener(topics = "your-topic", groupId = "your-group")
public void listen(String message, Acknowledgment acknowledgment, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
    // 1. 处理消息
    processMessage(message);

    // 2. 发送确认消息到确认 Topic
    String correlationId = key; // 使用消息的 Key 作为 Correlation ID
    kafkaTemplate.send("confirmation-topic", correlationId);

    // 3. 手动提交 Offset
    acknowledgment.acknowledge();
}

注意事项:

  • 确保 confirmationTopic 存在,并且生产者和消费者都有权限访问。
  • 使用 correlationId 来关联消息和确认消息,避免错误匹配。
  • 设置合理的超时时间,防止生产者无限等待。
  • 确保消费者在发送确认消息后,成功提交 Offset。

总结

虽然 Kafka 本身不提供生产者等待消费者确认的机制,但我们可以通过自定义逻辑来实现。选择哪种方案取决于具体的需求和场景。使用 Kafka Topic 作为确认通道是一种可靠性较高的方案,但需要额外的配置和代码。在实际应用中,需要根据具体情况选择合适的方案。

相关文章

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
spring框架介绍
spring框架介绍

本专题整合了spring框架相关内容,想了解更多详细内容,请阅读专题下面的文章。

104

2025.08.06

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

167

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

149

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

201

2024.02.23

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

482

2023.08.10

常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

973

2023.11.02

内存数据库有哪些
内存数据库有哪些

内存数据库有Redis、Memcached、Apache Ignite、VoltDB、TimesTen、H2 Database、Aerospike、Oracle TimesTen In-Memory Database、SAP HANA和ache Cassandra。更多关于内存数据库相关问题,详情请看本专题下面的文章。php中文网欢迎大家前来学习。

633

2023.11.14

mongodb和redis哪个读取速度快
mongodb和redis哪个读取速度快

redis 的读取速度比 mongodb 更快。原因包括:1. redis 使用简单的键值存储,而 mongodb 存储 json 格式的数据,需要解析和反序列化。2. redis 使用哈希表快速查找数据,而 mongodb 使用 b-tree 索引。因此,redis 在需要高性能读取操作的应用程序中是一个更好的选择。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

480

2024.04.02

Java JVM 原理与性能调优实战
Java JVM 原理与性能调优实战

本专题系统讲解 Java 虚拟机(JVM)的核心工作原理与性能调优方法,包括 JVM 内存结构、对象创建与回收流程、垃圾回收器(Serial、CMS、G1、ZGC)对比分析、常见内存泄漏与性能瓶颈排查,以及 JVM 参数调优与监控工具(jstat、jmap、jvisualvm)的实战使用。通过真实案例,帮助学习者掌握 Java 应用在生产环境中的性能分析与优化能力。

19

2026.01.20

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
进程与SOCKET
进程与SOCKET

共6课时 | 0.3万人学习

Redis+MySQL数据库面试教程
Redis+MySQL数据库面试教程

共72课时 | 6.4万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号