JoinBlock本身不主动抛出异常,而是通过Completion Task传播上游异常。当任一上游数据块因异常进入Faulted状态且PropagateCompletion为true时,JoinBlock的Completion Task也会变为Faulted,需通过await joinBlock.Completion并捕获AggregateException来处理异常,确保异常沿数据流正确传递。

C#中
JoinBlock
Completion
JoinBlock
Completion
try-catch
TransformBlock
理解
JoinBlock
Completion
JoinBlock
JoinBlock
BufferBlock
TransformBlock
JoinBlock
当你连接了多个上游数据块到
JoinBlock
Faulted
JoinBlock
Completion
Faulted
JoinBlock
await
Completion
try-catch
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public class JoinBlockExceptionExample
{
public static async Task RunExample()
{
// 假设我们有两个TransformBlock作为JoinBlock的输入源
var source1 = new TransformBlock<int, int>(async input =>
{
Console.WriteLine($"Source1 processing: {input}");
if (input == 3)
{
// 模拟一个上游异常
throw new InvalidOperationException("Source1 encountered a problem at 3!");
}
await Task.Delay(50); // 模拟异步操作
return input * 10;
});
var source2 = new TransformBlock<string, string>(async input =>
{
Console.WriteLine($"Source2 processing: {input}");
if (input == "C")
{
// 模拟另一个上游异常
throw new ArgumentException("Source2 doesn't like 'C'!");
}
await Task.Delay(50); // 模拟异步操作
return input + "X";
});
// 创建JoinBlock,期望接收int和string
var joinBlock = new JoinBlock<int, string>();
// 将上游块连接到JoinBlock
// PropagateCompletion设置为true,确保上游块的完成/异常状态会传递给JoinBlock
source1.LinkTo(joinBlock.Target1, new DataflowLinkOptions { PropagateCompletion = true });
source2.LinkTo(joinBlock.Target2, new DataflowLinkOptions { PropagateCompletion = true });
// 创建一个ActionBlock来处理JoinBlock的输出
var consumerBlock = new ActionBlock<Tuple<int, string>>(tuple =>
{
Console.WriteLine($"Consumed joined tuple: ({tuple.Item1}, {tuple.Item2})");
});
// 将JoinBlock连接到消费者块
// 同样,PropagateCompletion确保JoinBlock的完成/异常状态会传递给消费者块
joinBlock.LinkTo(consumerBlock);
// 异步发送数据到源块
var sendTask = Task.Run(async () =>
{
source1.Post(1);
source2.Post("A");
await Task.Delay(100);
source1.Post(2);
source2.Post("B");
await Task.Delay(100);
source1.Post(3); // 这会触发Source1的异常
source2.Post("C"); // 这会触发Source2的异常
await Task.Delay(100);
source1.Post(4);
source2.Post("D");
await Task.Delay(100);
source1.Complete();
source2.Complete();
});
try
{
// 等待整个数据流完成,并捕获异常
await Task.WhenAll(sendTask, consumerBlock.Completion);
Console.WriteLine("Dataflow completed successfully.");
}
catch (AggregateException ae)
{
// AggregateException是TPL Dataflow异常的常见包装
foreach (var ex in ae.Flatten().InnerExceptions)
{
Console.WriteLine($"Caught an exception in dataflow: {ex.GetType().Name} - {ex.Message}");
}
}
catch (Exception ex)
{
Console.WriteLine($"Caught a general exception: {ex.GetType().Name} - {ex.Message}");
}
}
public static void Main(string[] args)
{
RunExample().GetAwaiter().GetResult();
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
}
}在这个例子里,我刻意让两个上游
TransformBlock
JoinBlock
Post
source1.Completion
source2.Completion
Faulted
PropagateCompletion = true
Faulted
JoinBlock
joinBlock.Completion
Faulted
await consumerBlock.Completion
await joinBlock.Completion
AggregateException
捕获
JoinBlock
Completion
JoinBlock
Target1
Target2
Faulted
JoinBlock
Completion
Faulted
具体操作上,你通常会在整个数据流管道的末端,
await
Completion
await joinBlock.Completion
await
AggregateException
AggregateException
AggregateException.Flatten().InnerExceptions
// 假设 joinBlock 已经设置好并连接了上游
try
{
// 等待 JoinBlock 完成,如果上游有异常,这里会捕获到 AggregateException
await joinBlock.Completion;
Console.WriteLine("JoinBlock completed without errors.");
}
catch (AggregateException ae)
{
Console.WriteLine("JoinBlock completed with errors:");
foreach (var innerEx in ae.Flatten().InnerExceptions)
{
Console.WriteLine($"- {innerEx.GetType().Name}: {innerEx.Message}");
// 这里可以根据异常类型进行不同的处理,比如记录日志、通知用户等
}
}
catch (OperationCanceledException)
{
Console.WriteLine("JoinBlock operation was cancelled.");
// 通常是 CancellationTokenSource.Cancel() 导致
}
catch (Exception ex)
{
Console.WriteLine($"An unexpected error occurred: {ex.Message}");
}值得注意的是,如果你在创建
LinkTo
PropagateCompletion = true
JoinBlock
Completion
Faulted
PropagateCompletion = true
JoinBlock
JoinBlock
Faulted
JoinBlock
JoinBlock
这有点像一个组装流水线,如果某个零件供应商出问题了,即使其他零件都到位,最终产品也无法组装完成。
JoinBlock
Completion
Faulted
Faulted
基于Asp.Net+C#+Access的网上商店系统,具有智能化、高扩展、稳定、安全等特性,并拥有超强功能,可自由添加频道,后台智能修改风格,只要懂得网站常识的站长就可以轻松利用易想商城建立起专业的大型网上书店,点卡店、鲜花店、手机店、服装店、团购网等不同类型商城。易想商城有CMS增加频道功能,能够容易的把商城系统扩展成资讯网站多风格自由切换,全站经过专业的优化处理,让你的网站在百度上轻易的就能找
0
这导致了几个关于数据流完整性的思考:
JoinBlock
JoinBlock
JoinBlock
Faulted
JoinBlock
JoinBlock
Completion
Faulted
PropagateCompletion
true
Faulted
为了维护数据流的完整性,我个人觉得,在
JoinBlock
TransformBlock
Either<TSuccess, TError>
JoinBlock
JoinBlock
在实际项目中,
JoinBlock
上游数据块抛出业务逻辑异常:
TransformBlock
TransformBlock
PropagateCompletion
true
JoinBlock
AggregateException
TransformBlock
Result<TSuccess, TError>
Either<TSuccess, TError>
JoinBlock
ActionBlock
TransformBlock
上游数据块意外完成或取消:
JoinBlock
Complete()
CancellationTokenSource.Cancel()
JoinBlock
JoinBlock
Faulted
JoinBlock
Completion
Faulted
JoinBlock
OperationCanceledException
AggregateException
Faulted
CancellationTokenSource
Cancel()
死锁或活锁(与异常处理间接相关):
JoinBlock
BoundedCapacity
BoundedCapacity
SendAsync
Post
Post
SendAsync
false
总的来说,处理
JoinBlock
以上就是C#的JoinBlock的异常处理有什么特点?的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号