partitioner抛出invalidoperationexception的根本原因是其依赖的数据源在并行划分过程中被外部修改,导致内部状态不一致。1. 当使用partitioner.create处理非线程安全集合(如list

C#中
Partitioner抛出的
InvalidOperationException,通常发生在当你尝试在并行处理过程中,修改了作为数据源的集合时。简单来说,就是
Partitioner在划分任务时,它所依赖的底层数据源被“动了手脚”,导致其内部状态不一致,无法继续安全地执行划分操作。这通常不是一个bug,而是系统在告诉你,你正在做的事情可能会导致数据混乱或不完整。
解决方案
遇到
Partitioner引发
InvalidOperationException,核心思路是确保在并行处理期间,作为数据源的集合是稳定的、不可变的,或者至少其结构不会发生改变。
一种最直接、也最常用的方法,就是在将集合传递给
Partitioner之前,先创建一个它的“快照”或副本。例如,如果你有一个
List,并且你担心在
Parallel.ForEach执行过程中它会被修改,那么可以这样做:
ListoriginalList = new List { 1, 2, 3, 4, 5 }; // ... 某个地方可能会修改originalList // 在传递给Partitioner之前,创建一个副本 var stableSource = originalList.ToArray(); // 或者 .ToList() // 现在,使用这个稳定的副本进行并行处理 Parallel.ForEach(Partitioner.Create(stableSource), item => { // 对item进行操作 Console.WriteLine($"Processing {item}"); // 在这里不要修改originalList或stableSource });
这样做的好处是,
Partitioner操作的是一个独立的、不会被外部修改的数组或列表,从而避免了
InvalidOperationException。当然,这会引入一份内存开销,但对于大多数场景来说,这是可接受的权衡。
如果你的数据源必须是动态的,并且在并行处理期间会有并发修改,那么你需要考虑使用专门为并发设计的集合类型,比如
ConcurrentBag或
ConcurrentQueue。这些集合在设计上就考虑了多线程访问的安全性,它们通常能更好地与
Partitioner协同工作,尽管它们在某些特定场景下可能不会提供最佳的性能分区策略。
// 假设这是一个可能被并发修改的集合 ConcurrentBagconcurrentData = new ConcurrentBag (); concurrentData.Add("Alpha"); concurrentData.Add("Beta"); // ... 可以在其他线程继续添加或移除 // Partitioner.Create可以直接处理ConcurrentBag Parallel.ForEach(Partitioner.Create(concurrentData), item => { Console.WriteLine($"Processing {item}"); // 在这里对concurrentData进行修改通常是安全的,但要理解其语义 });
总而言之,问题的根源在于并行处理对数据源一致性的要求,解决方案则围绕着如何提供一个满足这一要求的数据源展开。
为什么Partitioner会抛出InvalidOperationException?
嗯,这事儿挺常见的,说实话,我也踩过这个坑。
Partitioner,特别是当它试图对一个非线程安全的集合(比如
List或
Array)进行划分时,它需要一个稳定的“视图”来确定每个并行任务应该处理哪些数据。你可以想象一下,它就像一个工头,正在给一群工人分配任务:第一个工人处理1-10号零件,第二个工人处理11-20号零件。如果在他分配任务的过程中,或者工人已经开始拿零件的时候,有人突然往生产线上加了几个零件,或者移走了几个,那工头之前算好的分配方案就全乱套了。
InvalidOperationException就是系统在告诉你:“嘿,你的集合在被我划分的时候变了!我没法保证我分出去的任务是正确的,也没法保证不会出现重复处理或者遗漏数据的情况。”这是一种“快失败”(fail-fast)机制,它不是一个bug,而是一种保护,防止你的程序在不一致的状态下继续运行,从而产生更难以调试的错误结果。
具体来说,当你使用
Partitioner.Create(myList)这样的方式时,
Partitioner可能会根据
myList的当前大小和结构来计算出各个并行任务的起始和结束索引。如果
myList在此时被另一个线程添加、删除元素,或者甚至只是改变了元素顺序,那么之前计算好的索引可能就会指向错误的位置,或者一部分数据被遗漏,另一部分被重复处理。这种不确定性是并行编程的大忌,所以系统选择直接抛出异常,强制你处理这种并发修改。
它主要发生在以下几种情况:
- 你在
Parallel.ForEach
或PLINQ操作一个List
、Dictionary
等非线程安全集合时,同时有另一个线程在向这个集合添加、删除元素。 - 你自定义了一个
OrderablePartitioner
,但你的GetPartitions
或GetDynamicPartitions
方法没有正确处理底层数据源的并发修改,或者它本身就依赖于一个非线程安全的快照。
如何安全地在并行处理中修改数据源?
这确实是一个常见的需求,但“安全地在并行处理中修改数据源”这个说法本身就有点陷阱。更准确的说法应该是:如何在并行处理中收集结果,或者在并行处理中处理动态变化的数据。直接修改作为
Partitioner数据源的集合,通常不是推荐的做法,因为这正是导致
InvalidOperationException的原因。
如果你需要在并行处理中产生新的数据,并把这些数据收集起来,你应该使用线程安全的集合来存储结果,而不是去修改原始的数据源。例如:
Listnumbers = Enumerable.Range(1, 100).ToList(); // 使用ConcurrentBag来收集并行处理的结果 ConcurrentBag results = new ConcurrentBag (); Parallel.ForEach(Partitioner.Create(numbers), number => { // 假设这是一个耗时的计算 double result = Math.Sqrt(number) * 10; results.Add(result); // 安全地将结果添加到线程安全集合中 }); // 所有并行任务完成后,可以安全地访问results foreach (var res in results) { Console.WriteLine(res); }
这里,
numbers集合在整个
Parallel.ForEach过程中是保持不变的,而
results是一个专门为并发写入设计的集合。
如果你的“数据源”本身就是动态的,比如一个队列,你希望在并行处理的同时,有新的数据不断地被添加到队列中,并且能够被处理,那么你需要从一开始就选择一个线程安全的集合作为数据源,并且
Partitioner能够很好地支持它。
ConcurrentQueue就是一个很好的例子:
ConcurrentQueuetasksQueue = new ConcurrentQueue (); tasksQueue.Enqueue("Task A"); tasksQueue.Enqueue("Task B"); // 模拟另一个线程不断添加任务 Task.Run(() => { for (int i = 0; i < 5; i++) { Thread.Sleep(500); // 模拟延迟 tasksQueue.Enqueue($"Dynamic Task {i}"); Console.WriteLine($"Added Dynamic Task {i}"); } }); // Partitioner.Create可以直接处理ConcurrentQueue // 注意:对于无限流或持续添加的队列,你可能需要一个停止条件 Parallel.ForEach(Partitioner.Create(tasksQueue), task => { Console.WriteLine($"Processing: {task}"); Thread.Sleep(200); // 模拟处理时间 }); Console.WriteLine("All tasks processed."); // 这行可能在队列完全清空前出现
在这种情况下,
Partitioner.Create(tasksQueue)能够适应
ConcurrentQueue的动态特性。然而,你需要注意的是,这种模式下,
Parallel.ForEach会在队列为空时停止,如果你的生产者线程还在持续生产,你可能需要更复杂的协调机制(比如使用
BlockingCollection)。
总的来说,避免在并行处理中直接修改作为
Partitioner数据源的非线程安全集合,而是将修改操作转移到结果收集阶段,或者从一开始就使用线程安全的集合作为数据源。
Partitioner.Create的不同重载有什么区别?
Partitioner.Create方法提供了多个重载,它们的设计是为了适应不同类型的数据源和不同的分区需求。理解这些区别对于优化并行性能和避免潜在问题至关重要。
-
Partitioner.Create
(IEnumerable source) - 这是最通用的重载,可以接受任何实现了
IEnumerable
的集合。 - 它会根据传入的
source
的具体类型,在内部选择一个合适的分区策略。例如,如果source
是List
或T[]
,它可能会使用基于索引的范围分区;如果是非索引集合,它可能会使用迭代器分区。 -
风险点: 如果
source
是一个非线程安全的集合,并且在Parallel.ForEach
执行期间被修改,就非常容易抛出InvalidOperationException
。这是最常见的触发点。
- 这是最通用的重载,可以接受任何实现了
-
Partitioner.Create
(IList list) - 这个重载专门针对实现了
IList
的集合。 - 由于
IList
提供了索引访问能力,Partitioner
可以利用这一点进行更高效的范围分区。它通常会将列表划分为连续的块,然后将这些块分配给不同的并行任务。 -
风险点: 尽管它能更高效地利用索引,但如果
list
在并行处理期间被修改(添加、删除元素),同样会引发InvalidOperationException
。
- 这个重载专门针对实现了
-
Partitioner.Create
(TSource[] array) - 这是针对数组
TSource[]
的重载。 - 数组是固定大小的,提供了最直接的索引访问。
Partitioner
可以非常高效地将数组划分为精确的、连续的子范围,这通常能提供最佳的性能。 -
安全性: 相对于
List
,数组的结构(大小)在创建后是不可变的,这使得它在作为Partitioner
的数据源时更安全,不会因为元素数量的变化而导致InvalidOperationException
。当然,如果你在并行处理中修改数组内部的元素值,那又是另一个层面的并发问题了,但至少不会因为结构变化而抛出InvalidOperationException
。
- 这是针对数组
-
Partitioner.Create(int fromInclusive, int toExclusive)
- 这个重载不是针对集合的,而是用来划分一个整数范围。
- 它非常适合当你需要并行执行一个基于索引的循环时,例如处理一个大型数组或数据库记录的某个范围。
-
示例:
Parallel.For(0, 100, i => { /* process item at index i */ });在内部就可能使用类似的分区策略。
-
Partitioner.Create
(IEnumerable source, EnumerablePartitionerOptions options) - 这个重载允许你为
IEnumerable
源指定额外的选项,比如EnumerablePartitionerOptions.NoBuffering
。 NoBuffering
选项会告诉Partitioner
不要在内部进行额外的缓冲。这在某些情况下可以减少内存使用,但可能会增加线程间的协调开销,从而影响性能。通常在处理非常大的数据集,或者数据生成速度很快时才会考虑。
- 这个重载允许你为
-
OrderablePartitioner
- 这是最灵活但也最复杂的。它是一个抽象基类,允许你实现自定义的分区逻辑。
- 当你需要对数据进行非常特殊的分区,或者你的数据源不是标准的集合类型,或者你需要实现更精细的负载均衡策略时,可以继承这个类。
-
责任: 实现
OrderablePartitioner
意味着你需要自己处理所有并发和一致性问题。如果你的自定义分区逻辑没有正确处理底层数据源的并发修改,你同样会遇到InvalidOperationException
或其他并发问题。
总的来说,选择哪个重载取决于你的数据源类型、你对性能的需求以及你是否需要处理动态或并发修改的数据。对于大多数情况,如果数据源是静态的,
ToArray()后使用
Partitioner.Create(TSource[])是最稳妥且高效的选择。如果数据源本身就是为并发设计的(如
ConcurrentBag),那么直接使用
Partitioner.Create(IEnumerable通常也能很好地工作。)










