我有一个TransformManyBlock<Tin, Tout>和在运行时添加消费者(ActionBlocks)通过LinkTo(...).
TransformManyBlock是正确的数据流块,消耗元素,转换它们,然后输出(输入相同的数字元素作为输入)给几个消费者(每个链接到消费者消费相同的元素,如广播)?我故意不选择BroadCastBlock,因为它似乎无法像BufferBlock一样转换元素.
我想知道在运行时如何取消消费者链接(ActionBlocks)?就我所见,LinkTo()似乎没有提供这样的功能.
我正在使用TPL Dataflow库中的TransformBlock,我已经意识到在转换期间抛出异常时,我在"接收"方法中得到一个通用异常,但没有提到原始异常.
在这段代码中:
Func<Int32, Task<String>> transformer = async i => { await Task.Yield(); throw new ArgumentException("whatever error"); };
TransformBlock<Int32, String> transform = new TransformBlock<int, string>(transformer);
transform.Post(1);
try
{
var x = await transform.ReceiveAsync();
}
catch (Exception ex)
{
// catch
}
Run Code Online (Sandbox Code Playgroud)
例外ex包含:
System.InvalidOperationException was caught
HResult=-2146233079
Message=The source completed without providing data to receive.
Source=System.Threading.Tasks.Dataflow
StackTrace:
at System.Threading.Tasks.Dataflow.Internal.Common.InitializeStackTrace(Exception exception)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at …Run Code Online (Sandbox Code Playgroud) Action直接指定或Func<Task>使用.NET TPL Dataflow有ActionBlock什么区别?
直接行动:
new ActionBlock<Message[]>(x => DoSomething(x))
Run Code Online (Sandbox Code Playgroud)
任务:
new ActionBlock<Message[]>(x => Task.Run(() => DoSomething(x)))
Run Code Online (Sandbox Code Playgroud)
我试图了解并行执行方面的差异(MaxDegreeOfParallelism> 1).
我对TPL Dataflow这个话题很陌生.在C#中的Concurrency一书中,我测试了以下示例.我无法弄清楚为什么没有输出应该是2*2-2=2;
static void Main(string[] args)
{
//Task tt = test();
Task tt = test1();
Console.ReadLine();
}
static async Task test1()
{
try
{
var multiplyBlock = new TransformBlock<int, int>(item =>
{
if (item == 1)
throw new InvalidOperationException("Blech.");
return item * 2;
});
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
multiplyBlock.LinkTo(subtractBlock,
new DataflowLinkOptions { PropagateCompletion = true });
multiplyBlock.Post(2);
await subtractBlock.Completion;
int temp = subtractBlock.Receive();
Console.WriteLine(temp);
}
catch (AggregateException e) …Run Code Online (Sandbox Code Playgroud) 来自 TPL 文档
与 一样
ActionBlock<TInput>,TransformBlock<TInput,TOutput>默认一次处理一条消息,保持严格的 FIFO 顺序。
但是,在多线程场景中,即如果多个线程“同时”执行SendAsync,然后通过调用“等待”结果ReceiveAsync,我们如何保证将某些内容发布到 的线程TransformBlock<TInput,TOutput>实际获得它正在等待的预期结果为了?
在我的实验中,似乎“保证”我想要的结果的方法是添加 option BoundedCapacity = 1。至少线程在发送和接收时仍然不会被阻塞。
如果我不这样做,某些线程将收到用于另一个线程的结果。
在这个特定用例中,这是正确的方法吗?
下面是一些代码,说明了我的担忧:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ConsoleTransformBlock
{
class Program
{
private readonly static TransformBlock<int, int> _pipeline;
static Program()
{
_pipeline = new TransformBlock<int, int>(async (input) =>
{
await Task.Delay(RandomGen2.Next(5, 100)).ConfigureAwait(false);
return input;
},
new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); // this is the …Run Code Online (Sandbox Code Playgroud) c# multithreading task-parallel-library async-await tpl-dataflow
在将TPL Dataflow移植到我的生产代码之前,我正在尝试使用它.生产代码是传统的生产者/消费者系统 - 生产者生成消息(与金融领域相关),消费者处理这些消息.
我感兴趣的是,如果在某些时候生产者的生产速度比消费者能够处理它的速度快得多(系统会爆炸,或者会发生什么),那么环境将会保持稳定,更重要的是在这些情况下该怎么做.
因此,为了尝试类似的简单应用程序,我想出了以下内容.
var bufferBlock = new BufferBlock<Item>();
var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
,
BoundedCapacity = 100000
};
var dataFlowLinkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
executiondataflowBlockOptions);
bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
bufferBlock.SendAsync(GenerateItem());
}
bufferBlock.Complete();
Console.ReadLine();
Run Code Online (Sandbox Code Playgroud)
Item 是一个非常简单的课程
internal class Item
{
public Item(string itemId)
{
ItemId = itemId;
}
public string ItemId { get; }
} …Run Code Online (Sandbox Code Playgroud) 我有一个ActionBlock简单地处理来自无限循环的消息。在里面ActionBlock我做了一个http post。当发生任何与网络相关的错误时,该方法会抛出异常并且该块会出现故障/停止。这不是我想要的行为。即使发生异常,我也希望处理运行。(继续打Process方法)来模拟我的程序;
private static ExecutionDataflowBlockOptions processBlockOptions
{
get
{
return new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
};
}
}
static async Start()
{
processQueue = new
ActionBlock<QueueMessage>(
async (item) =>
{
await Process(item);
},
processBlockOptions);
while (!Stopped)
{
//Read from DB and do logic with item
QueueMessage item= new QueueMessage();
await processQueue.SendAsync(item);
}
}
private async static Task<int> Process(QueueMessage item)
{
try
{
await item.HttpPost(order);
}
catch (Exception ex)
{
//Http endpoint might be …Run Code Online (Sandbox Code Playgroud) 我目前正在使用 ActionBlock 来处理连续启动的异步作业。它非常适合处理发布到它的每个项目,但无法收集每个作业的结果列表。
我可以使用什么来以线程安全的方式收集工作结果?
我的代码目前是这样的:
var actionBlock = new ActionBlock<int> (async i => await Process(i));
for(int i = 0; i < 100; i++)
{
actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;
Run Code Online (Sandbox Code Playgroud)
我尝试使用 TransformBlock 代替,但在等待完成时它会无限期地挂起。完成状态为“WaitingForActivation”。
我的 TransformBlock 代码是这样的:
var transformBlock = new TransformBlock<int, string> (async i => await Process(i));
for(int i = 0; i < 100; i++)
{
actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;
transformBlock.TryReceiveAll(out IList<string> strings);
Run Code Online (Sandbox Code Playgroud) 我有一个链接到基于谓词的动作块的转换块。
// Blocks
public TransformBlock<Document, Document> DocumentCreationTransformBlock =
new TransformBlock<Document, Document>(async document =>
{
return await CreateAsync(document); // REST API call that sets document.NewId
},
new ExecutionDataflowBlockOptions {
BoundedCapacity = 100,
MaxDegreeOfParallelism = 20
});
public ActionBlock<Document> SplitPipelineActionBlock =
new ActionBlock<Document>(async document =>
{ // implementation obfuscated
},
new ExecutionDataflowBlockOptions {
BoundedCapacity = 100
};
// Shared block elements
public DataflowLinkOptions CommonLinkOptions = new DataflowLinkOptions {
PropagateCompletion = true };
// Link mesh
DocumentCreationTransformBlock.LinkTo(SplitPipelineActionBlock,
CommonLinkOptions,
document => !string.IsNullOrEmpty(document.NewId));
DocumentCreationTransformBlock.LinkTo(DataflowBlock.NullTarget<Document>(),
CommonLinkOptions); …Run Code Online (Sandbox Code Playgroud) 我有一个 TPL 数据流,仅使用转换块和操作块就可以正常工作。我添加了一个新的操作块,与现有的操作块同时执行,但我的新操作块永远不会被命中。没有抛出任何错误或异常。我需要在代码中添加一个步骤吗?
var ListDocId = new ConcurrentBag<string>(ConvertDataSetToList(IdDocDataSet));
if (ListDocId.Any())
{
var num_thread = GetThreadNumber();
//Initialize the pipeline of actions
var downloadBlock = new TransformBlock<string, RequestObject>(docId =>
new RequestObject
{
DownloadedFile = ListDownloadocId),
IdDoc = docId
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);
var uploadInS3Block = new ActionBlock<S3RequestUpload>(requestS3Upload =>
UploadFileAsync(RequestObject.DownloadedFile, RequestObject.IdDoc),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);
var InsertdocIdIntoDbBlock = new ActionBlock<RequestObject>(s3Request =>
InsertIntotDataBase(s3Request.IdDoc, InsertDate),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);
var options = …Run Code Online (Sandbox Code Playgroud) tpl-dataflow ×10
c# ×9
async-await ×4
.net ×3
task ×2
asynchronous ×1
concurrency ×1
dataflow ×1