双向流式 RPC是gRPC中唯一支持客户端与服务器同时持续双向通信的模式,双方各自维护独立读写通道,可随时异步读写且互不阻塞,需在.proto中用两个stream关键字定义,并由应用层自行约定消息协议。

什么是 gRPC 的双向流式 RPC?
gRPC 的 BidirectionalStreamingRpc 是唯一支持服务器和客户端同时持续发消息的调用模式。它不是“先发后收”或“发一次收多次”,而是双方各自维护独立的读写通道,可随时 WriteAsync、随时 ReadAsync,彼此不阻塞。
常见误用是把它当“增强版客户端流”或“带回传的服务器流”——实际它没有隐含顺序约定,应用层必须自行定义协议(比如用 messageType 字段区分心跳、数据、结束信号)。
定义 .proto 时必须用 stream 修饰符两次
服务端流、客户端流、双向流在 .proto 中全靠 stream 出现次数区分,少一个关键字就生成完全不同的 C# 方法签名。
-
rpc Chat(stream ChatMessage) returns (ChatMessage)→ 客户端流(IAsyncEnumerable入,单次返回) -
rpc Chat(ChatMessage) returns (stream ChatMessage)→ 服务器流(单次入,IAsyncStreamReader出) -
rpc Chat(stream ChatMessage) returns (stream ChatMessage)→ 双向流(两个stream)→ 生成Task Chat(IAsyncStreamReader, IServerStreamWriter , ServerCallContext)
如果生成的 C# 类里没看到 IServerStreamWriter 参数,八成是 .proto 少写了一个 stream。
客户端发起双向流:别卡在 await foreach 里等响应
典型错误是写成:
var call = client.Chat();
await call.RequestStream.WriteAsync(new ChatMessage { Text = "hi" });
await foreach (var msg in call.ResponseStream.ReadAllAsync()) { /* ... */ } // ❌ 阻塞在此,后续 WriteAsync 不会执行
正确做法是并发驱动读写:
- 用
Task.Run(() => WriteLoop(call.RequestStream))单独发消息 - 用
await foreach在主线程收消息 - 或用
Channel+Writer/Reader解耦生产和消费
注意 RequestStream.CompleteAsync() 必须显式调用,否则服务端 ReadAsync() 永远不会返回 null。
服务端处理双向流:避免在循环里 await 所有操作
最易被忽略的是线程调度开销。下面这段代码在高并发下会迅速堆积 Task:
while (await requestStream.ReadAsync(out var req)) {
var resp = Process(req);
await responseStream.WriteAsync(resp); // ❌ 每次 WriteAsync 都可能跨线程调度
}
优化方向:
- 批量写入:缓存多个响应,用
WriteBatchAsync(需自实现或用Channel聚合) - 取消检查:在循环头部加
context.CancellationToken.ThrowIfCancellationRequested(),否则客户端断连后服务端仍空转 - 异常隔离:
ReadAsync抛异常时,WriteAsync可能已失败,不要假设响应一定送达
真正难调试的不是连接断开,而是某次 WriteAsync 因网络抖动超时后,后续所有写入都静默失败——因为 gRPC 的 IServerStreamWriter 不抛异常,只记录日志。









