标签: tpl-dataflow

BatchBlock 使用 TriggerBatch() 之后发送的元素生成批次

我有一个由多个块组成的数据流管道。当元素流经我的处理管道时,我想按 field 对它们进行分组A。为此,我有一个BatchBlockBoundedCapacity. 我在其中存储我的元素,直到我决定应该释放它们。所以我调用TriggerBatch()方法。

private void Forward(TStronglyTyped data)
{
    if (ShouldCreateNewGroup(data))
    {
        GroupingBlock.TriggerBatch();
    }

 GroupingBlock.SendAsync(data).Wait(SendTimeout);
}
Run Code Online (Sandbox Code Playgroud)

这就是它的样子。问题是,生成的批次有时包含下一个发布的元素,该元素不应该在那里。

为了显示:

BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);
Run Code Online (Sandbox Code Playgroud)

在这一点上,我希望我的批次是{A,A,A},但它是{A,A,A,B}

LikeTriggerBatch()是异步的,SendAsync实际上是在实际批处理之前执行的。

我该如何解决这个问题?我显然不想放在Task.Wait(x)那里(我尝试过,它有效,但当然性能很差)。

c# task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
1919
查看次数

用于 TPL 数据流的 BroadcastCopyBlock 保证交付

我很高兴为BroadcastCopyBlockTPL 数据流中的以下实现提供一些输入,该实现将收到的消息复制到所有消费者,该消息已注册BroadcastCopyBlock并保证交付给所有消费者,这些消费者在收到消息时链接到块. (与BroadcastBlock不保证消息传递的不同,如果下一个进来,则在前一个消息已传递给所有消费者之前)。

我主要关心的是消息的保留和保留的释放。如果接收块决定不处理消息,会发生什么?我的理解是,这会造成内存泄漏,因为消息将被无限期保留。我在想,我应该以某种方式将消息标记为未使用,但我不确定如何。我正在考虑一些人工消息接收器(ActionBlock没有任何操作),或者我可以将消息标记为已丢弃?

对实施的进一步投入也受到赞赏。

这可能几乎是以下问题的重复,但我更愿意使用我自己的类,而不是创建块的方法。或者这会被认为是不好的风格吗?
在 TPL 数据流中具有保证交付的 BroadcastBlock

/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
    private ITargetBlock<T> In { get; }

    /// <summary>
    /// Holds a TransformBlock for each target, that subscribed to this block
    /// </summary>
    private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks …
Run Code Online (Sandbox Code Playgroud)

c# broadcast task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
1134
查看次数

使用线程和 EventWaitHandle 的生产者/消费者模式

我想这是一种代码审查,但这是我对生产者/消费者模式的实现。我想知道的是,是否会出现ReceivingThread()orSendingThread()方法中的 while 循环停止执行的情况。请注意,它EnqueueSend(DataSendEnqeueInfo info)是从多个不同线程调用的,我可能无法在这里使用任务,因为我肯定必须在单独的线程中使用命令。

private Thread mReceivingThread;
private Thread mSendingThread;
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue;
private Queue<DataSendEnqeueInfo> mSendingThreadQueue;
private readonly object mReceivingQueueLock = new object();
private readonly object mSendingQueueLock = new object();
private bool mIsRunning;
EventWaitHandle mRcWaitHandle;
EventWaitHandle mSeWaitHandle;

private void ReceivingThread()
{
    while (mIsRunning)
    {
        mRcWaitHandle.WaitOne();
        DataRecievedEnqeueInfo item = null;
        while (mReceivingThreadQueue.Count > 0)
        {
            lock (mReceivingQueueLock)
            {
                item = mReceivingThreadQueue.Dequeue();
            }
            ProcessReceivingItem(item);
        }
        mRcWaitHandle.Reset();
    }
}

private void SendingThread()
{
    while (mIsRunning)
    {
        mSeWaitHandle.WaitOne(); …
Run Code Online (Sandbox Code Playgroud)

c# multithreading producer-consumer blockingcollection tpl-dataflow

4
推荐指数
1
解决办法
5643
查看次数

TPL 完成与完成

我需要从旧数据库导入客户相关数据,并在此过程中执行多次转换。这意味着单个条目需要执行额外的“事件”(同步产品、创建发票等)。

我最初的解决方案是一种简单的并行方法。它工作正常,但有时会出现问题。如果当前处理的客户需要等待相同类型的事件,他们的处理队列可能会被卡住并最终超时,导致每个底层事件也失败(它们依赖于失败的事件)。这种情况并不总是发生,但还是很烦人。

于是我有了另一个想法,分批工作。我的意思是不仅限制同时处理的客户数量,还限制广播到队列的事件数量。在四处寻找想法时,我找到了这个答案,它指向TPL DataFlow

我做了一个骨架来熟悉它。Complete()我设置了一个简单的管道,但我对和 waiting的用法有点困惑Completion()

步骤如下

  1. 制作一个数字列表(要导入的客户的 ID) - 这是导入逻辑之外的,它只是为了能够触发其余的逻辑
  2. 创建一个BatchBlock(能够限制同时处理的客户数量)
  3. MyClass1根据 id ( TransformBlock<int, MyClass1>)创建单个项目
  4. MyClass2执行一些逻辑并生成( )的集合TransformManyBlock<MyClass1, MyClass2>- 例如,睡眠 1 秒
  5. 对集合中的每个项目执行一些逻辑 ( ActionBlock<MyClass2>) - 例如,休眠 1 秒

这是完整的代码:

public static class Program
{
    private static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(2);
        for (var i = 1; i < 10; i++)
        {
            batchBlock.Post(i);
        }


        batchBlock.Complete();
        while (batchBlock.TryReceive(null, …
Run Code Online (Sandbox Code Playgroud)

c# parallel-processing multithreading task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
1830
查看次数

接收集合并为每个元素调用其链接块的 TPL 数据流

抱歉,如果已经有类似的问题,我找不到。

我有以下情况:

  1. 我必须对图像进行一些处理,并且 TPL 数据流非常适合这里,因为它允许我轻松地并行执行工作流的不同部分,并以逻辑单元分隔代码
  2. 有一个我无法控制的函数返回图像列表。它用作我的网格(或管道,更准确地说)中的第二个节点
  3. 我在 Dataflow 网格中的所有其他节点都使用单个图像,因此我在第二个项目符号中提到的 I 之后的节点期望获得一个图像(这对于并行性很重要)

是否有一个块(或其他一些解决方案)我可以使用它会接受类型IEnumerable<T>或类似的输入并将其每个元素转发IEnumerable到一个期望接收的块T

我不想重新发明轮子,所以我想在深入研究 API 并尝试编写自定义块之前检查是否有简单的解决方案。此外,将错误和完成传播到管道末端也很重要。

谢谢你的回答!

c# dataflow task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
576
查看次数

如何迭代BufferBlock <T>中的项目?

我最近开始使用.NET 4.5中的TPL Dataflow库,并且块的整个概念对我来说是新的.我正在我的应用程序中实现生产者 - 消费者队列,我需要防止重复的消息被放入队列,因此需要检查消息是否已经排队.我正在使用一种BufferBlock<Message>类型(Message是一种自定义类型).BufferBlock具有Count属性但在此问题中没有帮助,因为需要唯一标识消息.

有没有办法检查是否BufferBlock包含一个项目或检查所有项目并检查它们?是否有可能转换BufferBlock为允许迭代项目的东西?我正在按照我在MSDN上看到的一个例子,它没有检查项目是否在队列中,但我认为检查队列的内容是一个非常需要的操作.任何帮助表示赞赏.

.net c# producer-consumer task-parallel-library tpl-dataflow

3
推荐指数
1
解决办法
1169
查看次数

TPL数据流:有限容量并等待完成

下面我为了简单起见将一个真实场景复制为LINQPad脚本:

var total = 1 * 1000 * 1000;
var cts = new CancellationTokenSource();
var threads = Environment.ProcessorCount;
int capacity = 10;

var edbOptions = new ExecutionDataflowBlockOptions{BoundedCapacity = capacity, CancellationToken = cts.Token, MaxDegreeOfParallelism = threads};
var dbOptions = new DataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var gdbOptions = new GroupingDataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var dlOptions = new DataflowLinkOptions {PropagateCompletion = true};

var counter1 = 0;
var counter2 = 0;

var delay1 = 10;
var delay2 = …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library tpl-dataflow

3
推荐指数
1
解决办法
2966
查看次数

如何表示我的数据流完成?

我有一个类实现了一个使用TPL Dataflow由3个步骤组成的数据流.

在构造函数中,我将步骤创建为TransformBlocks并使用LinkTo将其链接起来,并将DataflowLinkOptions.PropagateCompletion设置为true.该类公开了一个方法,通过在第一步调用SendAsync来启动工作流.该方法返回工作流程最后一步的"完成"属性.

目前,工作流中的步骤似乎按预期执行,但最终步骤永远不会完成,除非我明确地在其上调用Complete.但这样做会使工作流程短路并且没有执行任何步骤?我究竟做错了什么?

public class MessagePipeline {
   private TransformBlock<object, object> step1;
   private TransformBlock<object, object> step2;
   private TransformBlock<object, object> step3;

   public MessagePipeline() {
      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
      step1 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step1...");
        return x;
      });
      step2 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step2...");
        return x;
      });
      step3 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step3...");
        return x;
      });

      step1.LinkTo(step2, linkOptions);
      step2.LinkTo(step3, linkOptions);
   }

   public Task Push(object message) {
      step1.SendAsync(message); …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library tpl-dataflow

3
推荐指数
1
解决办法
987
查看次数

将基于C#BlockingCollection的代码转换为TPL数据流

我有一个特定的问题,我相信可以使用TPL数据流解决.我对此很陌生,所以需要你的帮助来加快我的理解.我的代码目前是这样的:

目前的代码

其中Process1,Process2,Process3均为Task.对象通过阻塞集合从一个块传递到另一个块.我想这样做:

需要这样的

我读到了TransformBlock,ActionBlock和BatchBlock ..您可以帮助我如何使用这些类来实现上述设计.

c# task-parallel-library tpl-dataflow

3
推荐指数
1
解决办法
301
查看次数

TPL Dataflow与普通的Semaphore

我需要制作可扩展的流程.该进程主要具有I/O操作和一些次要CPU操作(主要是反序列化字符串).该进程在数据库中查询url列表,然后从这些url中获取数据,将下载的数据deserilize到对象,然后将一些数据保存到crm动态以及另一个数据库中.之后我需要更新第一个处理网址的数据库.部分要求是使并行度可配置.

最初我想通过一系列任务实现它,等待并使用Semaphore限制并行性 - 非常简单.然后我读了@Stephen Cleary的一些帖子和答案,建议使用TPL Dataflow,我认为它可能是一个很好的候选人.但是,我想通过使用Dataflow来确保我使代码"复杂化",这是值得的.我也有一个建议使用ForEachAsync扩展方法也很简单,但是我不确定它是否会因为它对集合进行分区而导致内存开销.

TPL Dataflow是否适合这种情况?它如何比Semaphore或ForEachAsync方法更好 - 如果我通过TPL DataFlow在每个其他选项(Semaphore/ForEachASync)上实现它,我将获得什么好处?

semaphore dataflow task task-parallel-library tpl-dataflow

3
推荐指数
1
解决办法
402
查看次数