BlockingCollection是线程安全的阻塞队列,底层默认封装ConcurrentQueue,支持生产/消费阻塞、容量限制及CompleteAdding()完成通知;GetConsumingEnumerable()是最简洁安全的消费方式。

BlockingCollection 是线程安全的队列容器它底层默认包装 ConcurrentQueue<t></t>,天生支持多线程生产/消费,不用手动加锁。关键在于它把“取不到数据时阻塞”和“容量满时阻塞”这两件事封装好了,比直接用 ConcurrentQueue<t></t> + ManualResetEvent 简洁得多。
- 构造时可传入容量上限,比如
new BlockingCollection<int>(boundedCapacity: 10)</int>,超限时 Add() 会阻塞
- 不设上限时(默认构造),
Add() 永不阻塞,但 Take() 在空时仍会阻塞
- 调用
CompleteAdding() 后,后续 Add() 抛 InvalidOperationException,已阻塞的 Take() 会逐个返回剩余项,之后才返回 default(T) 或抛异常(取决于是否启用 GetConsumingEnumerable())
用 GetConsumingEnumerable() 写消费者最简洁
这是最常用、也最不容易出错的消费写法,内部自动处理“空时等待”和“完成信号”。
var collection = new BlockingCollection<string>();
<p>// 生产者线程
Task.Run(() =>
{
for (int i = 0; i < 5; i++)
{
collection.Add($"item-{i}");
Thread.Sleep(100);
}
collection.CompleteAdding();
});</p><p>// 消费者线程
Task.Run(() =>
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumed: {item}");
Thread.Sleep(150);
}
});
-
GetConsumingEnumerable() 返回的是“消耗式枚举器”,每次 MoveNext() 都会调用 Take()
- 循环会在
CompleteAdding() 被调用且集合为空后自然退出
- 不要混用
Take() 和 GetConsumingEnumerable(),否则可能漏项或重复取
Add() 和 TryAdd() 的行为差异很关键
Add() 是阻塞式:当集合已达上限(有界)时,它会一直等,直到有空间;TryAdd() 则立即返回 bool 表示是否成功。
- 如果你希望“尽力加、不卡住”,用
TryAdd(item, timeoutMs),超时返回 false
- 若没设上限,
TryAdd() 总是返回 true(除非已 CompleteAdding())
-
Add() 在已完成添加后调用,会直接抛 InvalidOperationException:“The collection has been marked as complete with regards to additions.”
多个消费者共用同一个 BlockingCollection 安全吗
安全,而且正是设计初衷。所有 Add()、Take()、GetConsumingEnumerable() 都是线程安全的。
- 多个消费者调用
GetConsumingEnumerable(),每个都会独立消费不同元素,不会重复
- 没有“全局消费顺序保证”,但每个元素只被一个消费者拿到
- 注意:如果某个消费者处理慢,其他消费者仍能继续取,不会被拖累 —— 这是优势,但也意味着你要确保业务逻辑能容忍乱序或并行处理
真正容易忽略的是:一旦调用 CompleteAdding(),就不能再恢复。如果你需要暂停/恢复生产,得自己用 ManualResetEvent 或 SemaphoreSlim 控制,BlockingCollection 本身不提供这个能力。
它底层默认包装 ConcurrentQueue<t></t>,天生支持多线程生产/消费,不用手动加锁。关键在于它把“取不到数据时阻塞”和“容量满时阻塞”这两件事封装好了,比直接用 ConcurrentQueue<t></t> + ManualResetEvent 简洁得多。
- 构造时可传入容量上限,比如
new BlockingCollection<int>(boundedCapacity: 10)</int>,超限时Add()会阻塞 - 不设上限时(默认构造),
Add()永不阻塞,但Take()在空时仍会阻塞 - 调用
CompleteAdding()后,后续Add()抛InvalidOperationException,已阻塞的Take()会逐个返回剩余项,之后才返回default(T)或抛异常(取决于是否启用GetConsumingEnumerable())
用 GetConsumingEnumerable() 写消费者最简洁
这是最常用、也最不容易出错的消费写法,内部自动处理“空时等待”和“完成信号”。
var collection = new BlockingCollection<string>();
<p>// 生产者线程
Task.Run(() =>
{
for (int i = 0; i < 5; i++)
{
collection.Add($"item-{i}");
Thread.Sleep(100);
}
collection.CompleteAdding();
});</p><p>// 消费者线程
Task.Run(() =>
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumed: {item}");
Thread.Sleep(150);
}
});-
GetConsumingEnumerable()返回的是“消耗式枚举器”,每次MoveNext()都会调用Take() - 循环会在
CompleteAdding()被调用且集合为空后自然退出 - 不要混用
Take()和GetConsumingEnumerable(),否则可能漏项或重复取
Add() 和 TryAdd() 的行为差异很关键
Add() 是阻塞式:当集合已达上限(有界)时,它会一直等,直到有空间;TryAdd() 则立即返回 bool 表示是否成功。
- 如果你希望“尽力加、不卡住”,用
TryAdd(item, timeoutMs),超时返回false - 若没设上限,
TryAdd()总是返回true(除非已CompleteAdding()) -
Add()在已完成添加后调用,会直接抛InvalidOperationException:“The collection has been marked as complete with regards to additions.”
多个消费者共用同一个 BlockingCollection 安全吗
安全,而且正是设计初衷。所有 Add()、Take()、GetConsumingEnumerable() 都是线程安全的。
- 多个消费者调用
GetConsumingEnumerable(),每个都会独立消费不同元素,不会重复 - 没有“全局消费顺序保证”,但每个元素只被一个消费者拿到
- 注意:如果某个消费者处理慢,其他消费者仍能继续取,不会被拖累 —— 这是优势,但也意味着你要确保业务逻辑能容忍乱序或并行处理
真正容易忽略的是:一旦调用 CompleteAdding(),就不能再恢复。如果你需要暂停/恢复生产,得自己用 ManualResetEvent 或 SemaphoreSlim 控制,BlockingCollection 本身不提供这个能力。










