0

0

Quarkus WebSocket 中异步场景下的 MDC 上下文透传实践指南

霞舞

霞舞

发布时间:2026-02-19 09:38:11

|

257人浏览过

|

来源于php中文网

原创

Quarkus WebSocket 中异步场景下的 MDC 上下文透传实践指南

本文详解如何在 Quarkus WebSocket 服务中,于 @OnMessage 异步处理及 Vert.x EventBus 事件消费阶段,可靠地透传 MDC(Mapped Diagnostic Context)日志上下文,确保 user.id、websocket.sessionId 等关键追踪字段全程可用。

本文详解如何在 quarkus websocket 服务中,于 `@onmessage` 异步处理及 vert.x eventbus 事件消费阶段,可靠地透传 mdc(mapped diagnostic context)日志上下文,确保 `user.id`、`websocket.sessionid` 等关键追踪字段全程可用。

在 Quarkus 的 WebSocket 实现中,MDC(如 Logback 的 MDC)本质上基于 ThreadLocal,其生命周期严格绑定于当前线程。而 WebSocket 的 @OnMessage 回调虽由容器线程触发,但一旦消息被投递至 Vert.x EventBus(如 vertx.eventBus().send()),后续的 @ConsumeEvent 处理将运行在 Vert.x 工作线程池中——此时原始线程的 MDC 上下文已丢失,导致 MDC.get("user.id") 返回 null。

要解决这一问题,核心思路是:主动捕获、显式存储、按需恢复。不能依赖自动传播(Quarkus 未为 WebSocket + EventBus 组合提供开箱即用的 MDC 跨线程透传),而应将 MDC 上下文快照与 WebSocket 会话强绑定,并在异步处理入口处手动还原。

✅ 推荐实现方案:基于 Session ID 的 MDC 快照管理

我们采用静态线程安全映射(ConcurrentHashMap)持久化每个活跃 WebSocket 会话的 MDC 快照,并在关键生命周期节点进行存取操作:

紫东太初
紫东太初

中科院和武汉AI研究院推出的新一代大模型

下载
// UserWebSocketController.java
@Slf4j
@ApplicationScoped
@ServerEndpoint(value = "/users/{userId}")
public class UserWebSocketController {

    // ⚠️ 线程安全的 MDC 快照存储(Key: session.id)
    private static final Map<String, Map<String, String>> SESSION_MDC_MAP =
            new ConcurrentHashMap<>();

    private final WebsocketConnectionService websocketConnectionService;
    private final Vertx vertx;

    public UserWebSocketController(WebsocketConnectionService websocketConnectionService, Vertx vertx) {
        this.websocketConnectionService = websocketConnectionService;
        this.vertx = vertx;
    }

    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        // 1. 初始化 MDC
        MDC.put("websocket.sessionId", session.getId());
        MDC.put("user.id", userId);
        log.info("New WebSocket Session opened for user {}", userId);

        // 2. 持久化当前 MDC 快照(供后续异步使用)
        storeSessionMDC(session.getId());

        websocketConnectionService.addConnection(userId, session);
    }

    @OnMessage
    public void onMessage(Session session, String message, @PathParam("userId") String userId) {
        // 3. 关键:恢复当前会话的 MDC(保障 onMessage 内日志可追溯)
        restoreSessionMDC(session.getId());
        log.info("New message received: {}", message);

        // 4. 将 sessionId 显式携带至事件负载,用于下游还原
        var asyncMsg = new AsyncWebSocketMessage(session.getId(), message);
        vertx.eventBus().send("websocket.message.new", asyncMsg);
    }

    @OnClose
    public void onClose(Session session, @PathParam("userId") String userId) {
        restoreSessionMDC(session.getId());
        log.info("WebSocket Session closed for user {}", userId);
        removeSessionMDC(session.getId()); // 清理资源
        websocketConnectionService.removeSession(userId);
    }

    @OnError
    public void onError(Session session, @PathParam("userId") String userId, Throwable throwable) {
        restoreSessionMDC(session.getId());
        log.error("Error in WebSocket session for user {}", userId, throwable);
        removeSessionMDC(session.getId());
        websocketConnectionService.removeSession(userId);
    }

    // —— MDC 快照工具方法 ——
    private static void storeSessionMDC(String sessionId) {
        SESSION_MDC_MAP.put(sessionId, MDC.getCopyOfContextMap());
    }

    public static void restoreSessionMDC(String sessionId) {
        Map<String, String> contextMap = SESSION_MDC_MAP.get(sessionId);
        if (contextMap != null) {
            MDC.setContextMap(contextMap);
        } else {
            MDC.clear(); // 防止残留
        }
    }

    private static void removeSessionMDC(String sessionId) {
        SESSION_MDC_MAP.remove(sessionId);
        MDC.clear();
    }

    // 事件载体(需可序列化,适配 Vert.x EventBus)
    public static class AsyncWebSocketMessage implements Serializable {
        private final String sessionId;
        private final String payload;

        public AsyncWebSocketMessage(String sessionId, String payload) {
            this.sessionId = sessionId;
            this.payload = payload;
        }

        public String getSessionId() { return sessionId; }
        public String getPayload() { return payload; }
    }
}

? 在事件消费者中还原 MDC

UserService 的 @ConsumeEvent 方法需在执行业务逻辑前,主动调用 UserWebSocketController.restoreSessionMDC(...),确保日志上下文就位:

// UserService.java
@Slf4j
@ApplicationScoped
public class UserService {

    private final WebsocketConnectionService websocketConnectionService;
    private final Vertx vertx;

    public UserService(WebsocketConnectionService websocketConnectionService, Vertx vertx) {
        this.websocketConnectionService = websocketConnectionService;
        this.vertx = vertx;
    }

    @ConsumeEvent("websocket.message.new")
    public Uni<Void> handleWebSocketMessages(UserWebSocketController.AsyncWebSocketMessage msg) {
        // ✅ 关键:在业务逻辑前恢复 MDC
        UserWebSocketController.restoreSessionMDC(msg.getSessionId());

        // 此时 MDC 已就绪
        String userId = MDC.get("user.id");
        log.info("Processing message for user {} with payload: {}", userId, msg.getPayload());

        // 执行实际业务逻辑(如 DB 查询、外部调用等)
        // ... your business logic ...

        return Uni.createFrom().voidItem();
    }
}

⚠️ 注意事项与最佳实践

  • 内存泄漏防护:务必在 @OnClose 和 @OnError 中调用 removeSessionMDC(),避免 SESSION_MDC_MAP 持续增长。生产环境建议增加 TTL 或结合 WeakReference 进一步优化。
  • 序列化兼容性:AsyncWebSocketMessage 必须 implements Serializable,且所有字段需可序列化(Vert.x EventBus 默认使用 Jackson 序列化,若使用 Protobuf 等需额外配置)。
  • 线程安全性:ConcurrentHashMap 保证了多线程存取安全,但 MDC.setContextMap() 本身是线程局部的,无需额外同步。
  • 日志一致性:@OnOpen/@OnMessage/@OnClose 中均调用 restoreSessionMDC(),可确保整个 WebSocket 生命周期内日志链路完整。
  • 替代方案考量:若项目已引入 Micrometer Tracing(如 OpenTelemetry),可优先使用 TracedExecutor 或 Tracer.withSpanInScope() 实现分布式追踪上下文透传,MDC 则作为补充的日志增强字段。

通过该方案,你能在 Quarkus WebSocket 的异步事件流中,稳定维持用户身份、会话标识等诊断信息,大幅提升日志可追溯性与故障排查效率。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

394

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

246

2023.10.07

c语言中null和NULL的区别
c语言中null和NULL的区别

c语言中null和NULL的区别是:null是C语言中的一个宏定义,通常用来表示一个空指针,可以用于初始化指针变量,或者在条件语句中判断指针是否为空;NULL是C语言中的一个预定义常量,通常用来表示一个空值,用于表示一个空的指针、空的指针数组或者空的结构体指针。

244

2023.09.22

java中null的用法
java中null的用法

在Java中,null表示一个引用类型的变量不指向任何对象。可以将null赋值给任何引用类型的变量,包括类、接口、数组、字符串等。想了解更多null的相关内容,可以阅读本专题下面的文章。

786

2024.03.01

session失效的原因
session失效的原因

session失效的原因有会话超时、会话数量限制、会话完整性检查、服务器重启、浏览器或设备问题等等。详细介绍:1、会话超时:服务器为Session设置了一个默认的超时时间,当用户在一段时间内没有与服务器交互时,Session将自动失效;2、会话数量限制:服务器为每个用户的Session数量设置了一个限制,当用户创建的Session数量超过这个限制时,最新的会覆盖最早的等等。

327

2023.10.17

session失效解决方法
session失效解决方法

session失效通常是由于 session 的生存时间过期或者服务器关闭导致的。其解决办法:1、延长session的生存时间;2、使用持久化存储;3、使用cookie;4、异步更新session;5、使用会话管理中间件。

773

2023.10.18

cookie与session的区别
cookie与session的区别

本专题整合了cookie与session的区别和使用方法等相关内容,阅读专题下面的文章了解更详细的内容。

97

2025.08.19

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

675

2023.08.10

pixiv网页版官网登录与阅读指南_pixiv官网直达入口与在线访问方法
pixiv网页版官网登录与阅读指南_pixiv官网直达入口与在线访问方法

本专题系统整理pixiv网页版官网入口及登录访问方式,涵盖官网登录页面直达路径、在线阅读入口及快速进入方法说明,帮助用户高效找到pixiv官方网站,实现便捷、安全的网页端浏览与账号登录体验。

561

2026.02.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
swoole入门物联网开发与实战
swoole入门物联网开发与实战

共15课时 | 1.3万人学习

swoole项目实战(第二季)
swoole项目实战(第二季)

共15课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号