我有一个由多个块组成的数据流管道。当元素流经我的处理管道时,我想按 field 对它们进行分组A。为此,我有一个BatchBlock高BoundedCapacity. 我在其中存储我的元素,直到我决定应该释放它们。所以我调用TriggerBatch()方法。
private void Forward(TStronglyTyped data)
{
if (ShouldCreateNewGroup(data))
{
GroupingBlock.TriggerBatch();
}
GroupingBlock.SendAsync(data).Wait(SendTimeout);
}
Run Code Online (Sandbox Code Playgroud)
这就是它的样子。问题是,生成的批次有时包含下一个发布的元素,该元素不应该在那里。
为了显示:
BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);
Run Code Online (Sandbox Code Playgroud)
在这一点上,我希望我的批次是{A,A,A},但它是{A,A,A,B}
LikeTriggerBatch()是异步的,SendAsync实际上是在实际批处理之前执行的。
我该如何解决这个问题?我显然不想放在Task.Wait(x)那里(我尝试过,它有效,但当然性能很差)。
我很高兴为BroadcastCopyBlockTPL 数据流中的以下实现提供一些输入,该实现将收到的消息复制到所有消费者,该消息已注册BroadcastCopyBlock并保证交付给所有消费者,这些消费者在收到消息时链接到块. (与BroadcastBlock不保证消息传递的不同,如果下一个进来,则在前一个消息已传递给所有消费者之前)。
我主要关心的是消息的保留和保留的释放。如果接收块决定不处理消息,会发生什么?我的理解是,这会造成内存泄漏,因为消息将被无限期保留。我在想,我应该以某种方式将消息标记为未使用,但我不确定如何。我正在考虑一些人工消息接收器(ActionBlock没有任何操作),或者我可以将消息标记为已丢弃?
对实施的进一步投入也受到赞赏。
这可能几乎是以下问题的重复,但我更愿意使用我自己的类,而不是创建块的方法。或者这会被认为是不好的风格吗?
在 TPL 数据流中具有保证交付的 BroadcastBlock
/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
private ITargetBlock<T> In { get; }
/// <summary>
/// Holds a TransformBlock for each target, that subscribed to this block
/// </summary>
private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks …Run Code Online (Sandbox Code Playgroud) 我想这是一种代码审查,但这是我对生产者/消费者模式的实现。我想知道的是,是否会出现ReceivingThread()orSendingThread()方法中的 while 循环停止执行的情况。请注意,它EnqueueSend(DataSendEnqeueInfo info)是从多个不同线程调用的,我可能无法在这里使用任务,因为我肯定必须在单独的线程中使用命令。
private Thread mReceivingThread;
private Thread mSendingThread;
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue;
private Queue<DataSendEnqeueInfo> mSendingThreadQueue;
private readonly object mReceivingQueueLock = new object();
private readonly object mSendingQueueLock = new object();
private bool mIsRunning;
EventWaitHandle mRcWaitHandle;
EventWaitHandle mSeWaitHandle;
private void ReceivingThread()
{
while (mIsRunning)
{
mRcWaitHandle.WaitOne();
DataRecievedEnqeueInfo item = null;
while (mReceivingThreadQueue.Count > 0)
{
lock (mReceivingQueueLock)
{
item = mReceivingThreadQueue.Dequeue();
}
ProcessReceivingItem(item);
}
mRcWaitHandle.Reset();
}
}
private void SendingThread()
{
while (mIsRunning)
{
mSeWaitHandle.WaitOne(); …Run Code Online (Sandbox Code Playgroud) c# multithreading producer-consumer blockingcollection tpl-dataflow
我需要从旧数据库导入客户相关数据,并在此过程中执行多次转换。这意味着单个条目需要执行额外的“事件”(同步产品、创建发票等)。
我最初的解决方案是一种简单的并行方法。它工作正常,但有时会出现问题。如果当前处理的客户需要等待相同类型的事件,他们的处理队列可能会被卡住并最终超时,导致每个底层事件也失败(它们依赖于失败的事件)。这种情况并不总是发生,但还是很烦人。
于是我有了另一个想法,分批工作。我的意思是不仅限制同时处理的客户数量,还限制广播到队列的事件数量。在四处寻找想法时,我找到了这个答案,它指向TPL DataFlow。
我做了一个骨架来熟悉它。Complete()我设置了一个简单的管道,但我对和 waiting的用法有点困惑Completion()。
步骤如下
BatchBlock(能够限制同时处理的客户数量)MyClass1根据 id ( TransformBlock<int, MyClass1>)创建单个项目MyClass2执行一些逻辑并生成( )的集合TransformManyBlock<MyClass1, MyClass2>- 例如,睡眠 1 秒ActionBlock<MyClass2>) - 例如,休眠 1 秒这是完整的代码:
public static class Program
{
private static void Main(string[] args)
{
var batchBlock = new BatchBlock<int>(2);
for (var i = 1; i < 10; i++)
{
batchBlock.Post(i);
}
batchBlock.Complete();
while (batchBlock.TryReceive(null, …Run Code Online (Sandbox Code Playgroud) c# parallel-processing multithreading task-parallel-library tpl-dataflow
抱歉,如果已经有类似的问题,我找不到。
我有以下情况:
是否有一个块(或其他一些解决方案)我可以使用它会接受类型IEnumerable<T>或类似的输入并将其每个元素转发IEnumerable到一个期望接收的块T?
我不想重新发明轮子,所以我想在深入研究 API 并尝试编写自定义块之前检查是否有简单的解决方案。此外,将错误和完成传播到管道末端也很重要。
谢谢你的回答!
我最近开始使用.NET 4.5中的TPL Dataflow库,并且块的整个概念对我来说是新的.我正在我的应用程序中实现生产者 - 消费者队列,我需要防止重复的消息被放入队列,因此需要检查消息是否已经排队.我正在使用一种BufferBlock<Message>类型(Message是一种自定义类型).BufferBlock具有Count属性但在此问题中没有帮助,因为需要唯一标识消息.
有没有办法检查是否BufferBlock包含一个项目或检查所有项目并检查它们?是否有可能转换BufferBlock为允许迭代项目的东西?我正在按照我在MSDN上看到的一个例子,它没有检查项目是否在队列中,但我认为检查队列的内容是一个非常需要的操作.任何帮助表示赞赏.
.net c# producer-consumer task-parallel-library tpl-dataflow
下面我为了简单起见将一个真实场景复制为LINQPad脚本:
var total = 1 * 1000 * 1000;
var cts = new CancellationTokenSource();
var threads = Environment.ProcessorCount;
int capacity = 10;
var edbOptions = new ExecutionDataflowBlockOptions{BoundedCapacity = capacity, CancellationToken = cts.Token, MaxDegreeOfParallelism = threads};
var dbOptions = new DataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var gdbOptions = new GroupingDataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var dlOptions = new DataflowLinkOptions {PropagateCompletion = true};
var counter1 = 0;
var counter2 = 0;
var delay1 = 10;
var delay2 = …Run Code Online (Sandbox Code Playgroud) 我有一个类实现了一个使用TPL Dataflow由3个步骤组成的数据流.
在构造函数中,我将步骤创建为TransformBlocks并使用LinkTo将其链接起来,并将DataflowLinkOptions.PropagateCompletion设置为true.该类公开了一个方法,通过在第一步调用SendAsync来启动工作流.该方法返回工作流程最后一步的"完成"属性.
目前,工作流中的步骤似乎按预期执行,但最终步骤永远不会完成,除非我明确地在其上调用Complete.但这样做会使工作流程短路并且没有执行任何步骤?我究竟做错了什么?
public class MessagePipeline {
private TransformBlock<object, object> step1;
private TransformBlock<object, object> step2;
private TransformBlock<object, object> step3;
public MessagePipeline() {
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
step1 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step1...");
return x;
});
step2 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step2...");
return x;
});
step3 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step3...");
return x;
});
step1.LinkTo(step2, linkOptions);
step2.LinkTo(step3, linkOptions);
}
public Task Push(object message) {
step1.SendAsync(message); …Run Code Online (Sandbox Code Playgroud) 我有一个特定的问题,我相信可以使用TPL数据流解决.我对此很陌生,所以需要你的帮助来加快我的理解.我的代码目前是这样的:
其中Process1,Process2,Process3均为Task.对象通过阻塞集合从一个块传递到另一个块.我想这样做:
我读到了TransformBlock,ActionBlock和BatchBlock ..您可以帮助我如何使用这些类来实现上述设计.
我需要制作可扩展的流程.该进程主要具有I/O操作和一些次要CPU操作(主要是反序列化字符串).该进程在数据库中查询url列表,然后从这些url中获取数据,将下载的数据deserilize到对象,然后将一些数据保存到crm动态以及另一个数据库中.之后我需要更新第一个处理网址的数据库.部分要求是使并行度可配置.
最初我想通过一系列任务实现它,等待并使用Semaphore限制并行性 - 非常简单.然后我读了@Stephen Cleary的一些帖子和答案,建议使用TPL Dataflow,我认为它可能是一个很好的候选人.但是,我想通过使用Dataflow来确保我使代码"复杂化",这是值得的.我也有一个建议使用ForEachAsync扩展方法也很简单,但是我不确定它是否会因为它对集合进行分区而导致内存开销.
TPL Dataflow是否适合这种情况?它如何比Semaphore或ForEachAsync方法更好 - 如果我通过TPL DataFlow在每个其他选项(Semaphore/ForEachASync)上实现它,我将获得什么好处?