
本文详解如何在 Quarkus WebSocket 服务中,于 @OnMessage 异步转发至 Vert.x EventBus 及事件消费者时,完整保留并复用 MDC(Mapped Diagnostic Context)中的请求级日志上下文(如 user.id、websocket.sessionId),解决因线程切换导致的 MDC 丢失问题。
本文详解如何在 quarkus websocket 服务中,于 `@onmessage` 异步转发至 vert.x eventbus 及事件消费者时,完整保留并复用 mdc(mapped diagnostic context)中的请求级日志上下文(如 `user.id`、`websocket.sessionid`),解决因线程切换导致的 mdc 丢失问题。
在 Quarkus 中构建 WebSocket 服务时,常需将耗时逻辑(如业务校验、外部调用)异步化以避免阻塞 I/O 线程。典型做法是通过 Vert.x EventBus 发布消息,并由 @ConsumeEvent 方法在工作线程中消费处理。然而,由于 MDC 本质依赖 ThreadLocal,而 WebSocket 生命周期方法(@OnOpen/@OnMessage)运行在 Netty/Vert.x I/O 线程,而事件消费者运行在独立的工作线程池中,原始 MDC 上下文无法自动跨线程传递——这直接导致 MDC.get("user.id") 在 handleWebSocketMessages 中返回 null。
要实现可靠的上下文传播,核心思路是:在 I/O 线程中显式捕获 MDC 快照,并将其随消息一同传递;在消费端线程中主动恢复该快照。以下为经过生产验证的完整实现方案:
✅ 步骤一:定义可序列化的上下文携带消息
为确保 EventBus 消息能安全跨线程/跨节点传输,建议使用轻量、无状态的 POJO 封装原始消息与会话标识:
public class WebSocketAsyncMessage implements Serializable {
private final String sessionId;
private final String payload;
public WebSocketAsyncMessage(String sessionId, String payload) {
this.sessionId = sessionId;
this.payload = payload;
}
// getters...
}⚠️ 注意:Serializable 是 Vert.x EventBus 默认编解码要求(若启用 Jackson 编解码器可替换为 @RegisterForReflection + JSON 序列化)。
✅ 步骤二:集中管理会话级 MDC 快照
在 WebSocket 控制器中维护一个线程安全的静态映射表,以 sessionId 为键存储 MDC.getCopyOfContextMap() 的副本:
@Slf4j
@ApplicationScoped
@ServerEndpoint(value = "/users/{userId}")
public class UserWebSocketController {
// 使用 ConcurrentHashMap 保证线程安全
private static final Map<String, Map<String, String>> SESSION_MDC_CONTEXTS =
new ConcurrentHashMap<>();
private final WebsocketConnectionService websocketConnectionService;
private final Vertx vertx;
public UserWebSocketController(WebsocketConnectionService websocketConnectionService, Vertx vertx) {
this.websocketConnectionService = websocketConnectionService;
this.vertx = vertx;
}
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
String sessionId = session.getId();
// 初始化 MDC
MDC.put("websocket.sessionId", sessionId);
MDC.put("user.id", userId);
log.info("New WebSocket Session opened for user {}", userId);
// 持久化当前 MDC 快照
SESSION_MDC_CONTEXTS.put(sessionId, MDC.getCopyOfContextMap());
websocketConnectionService.addConnection(userId, session);
}
@OnMessage
public void onMessage(Session session, String message, @PathParam("userId") String userId) {
String sessionId = session.getId();
// 关键:在发送前恢复当前会话的 MDC(确保日志含上下文)
restoreSessionMDC(sessionId);
log.info("Received message: {}", message);
// 将 sessionId 与消息一起发送,供消费端还原上下文
vertx.eventBus().send("websocket.message.new",
new WebSocketAsyncMessage(sessionId, message));
}
@OnClose
public void onClose(Session session, @PathParam("userId") String userId) {
String sessionId = session.getId();
restoreSessionMDC(sessionId);
log.info("WebSocket Session closed for user {}", userId);
// 清理资源:移除 MDC 快照 & 连接
SESSION_MDC_CONTEXTS.remove(sessionId);
websocketConnectionService.removeSession(userId);
}
@OnError
public void onError(Session session, @PathParam("userId") String userId, Throwable throwable) {
String sessionId = session.getId();
restoreSessionMDC(sessionId);
log.error("Error in WebSocket session for user {}", userId, throwable);
websocketConnectionService.removeSession(userId);
}
// 工具方法:恢复指定会话的 MDC 上下文
public static void restoreSessionMDC(String sessionId) {
Map<String, String> context = SESSION_MDC_CONTEXTS.get(sessionId);
if (context != null) {
MDC.setContextMap(context);
} else {
MDC.clear(); // 防止残留旧上下文
}
}
}✅ 步骤三:在事件消费者中主动还原 MDC
在 UserService 的事件处理器中,先调用 UserWebSocketController.restoreSessionMDC(...),再执行业务逻辑:
@Slf4j
@ApplicationScoped
public class UserService {
private final WebsocketConnectionService websocketConnectionService;
private final Vertx vertx;
public UserService(WebsocketConnectionService websocketConnectionService, Vertx vertx) {
this.websocketConnectionService = websocketConnectionService;
this.vertx = vertx;
}
@ConsumeEvent("websocket.message.new")
public Uni<Void> handleWebSocketMessages(WebSocketAsyncMessage asyncMessage) {
// ✅ 关键:立即还原该会话的 MDC 上下文
UserWebSocketController.restoreSessionMDC(asyncMessage.getSessionId());
// 此时 MDC 已就绪,可安全读取
String userId = MDC.get("user.id");
log.info("Processing message for user {} with payload: {}", userId, asyncMessage.getPayload());
// 执行实际业务逻辑(例如:持久化、通知、调用其他服务)
// ... business logic ...
return Uni.createFrom().voidItem();
}
}? 关键注意事项与最佳实践
- 线程安全性:ConcurrentHashMap 是必须的——WebSocket 多个连接可能并发触发 onOpen/onClose。
- 内存泄漏防护:务必在 @OnClose 和 @OnError 中调用 SESSION_MDC_CONTEXTS.remove(sessionId),避免长期持有已断开连接的上下文。
- 日志一致性:所有 log.*() 调用前应确保 restoreSessionMDC() 已执行,否则日志将缺失关键诊断字段。
- 扩展性考量:若需支持分布式部署,SESSION_MDC_CONTEXTS 应替换为 Redis 或 Infinispan 等共享存储(本例适用于单节点场景)。
- 替代方案提示:Quarkus 2.13+ 提供了 @WithSpan 与 OpenTelemetry 集成,对链路追踪更友好;但 MDC 仍是最轻量、最直接的日志上下文注入方式。
通过以上设计,你能在完全异步的 WebSocket 消息流中,稳定维持用户身份、会话标识等关键日志维度,大幅提升可观测性与问题排查效率。










