标签: tpl-dataflow

TPL数据流加速?

我想知道以下代码是否可以优化以更快地执行.在一个非常简单的数据流结构中,我目前似乎每秒大约有140万条简单消息.我知道这个示例进程同步传递/转换消息,但是,我目前正在测试TPL Dataflow作为我自己的基于任务和并发集合的自定义解决方案的替代.我知道术语"并发"已经建议我并行运行,但是出于当前的测试目的,我通过同步推送消息在我自己的解决方案上,每秒钟我得到大约510万条消息.我在这里缺少什么,我读过TPL Dataflow被推为高吞吐量,低延迟的解决方案,但到目前为止我必须忽略性能调整.有谁能指出我正确的方向吗?

class TPLDataFlowExperiments
{
    public TPLDataFlowExperiments()
    {
        var buf1 = new BufferBlock<int>();

        var transform = new TransformBlock<int, string>(t =>
            {
                return "";
            });

        var action = new ActionBlock<string>(s =>
            {
                //Thread.Sleep(100);
                //Console.WriteLine(s);
            });

        buf1.LinkTo(transform);
        transform.LinkTo(action);

        //Propagate all Completions down the flow
        buf1.Completion.ContinueWith(t =>
        {
            transform.Complete();
            transform.Completion.ContinueWith(u =>
            {
                action.Complete();
            });
        });

        Stopwatch watch = new Stopwatch();
        watch.Start();

        int cap = 10000000;
        for (int i = 0; i < cap; i++)
        {
            buf1.Post(i);
        }

        //Mark Buffer as Complete
        buf1.Complete(); …
Run Code Online (Sandbox Code Playgroud)

c# concurrency task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
4727
查看次数

TPL Dataflow被削减了吗?

我打算玩TPL数据流,但我似乎无法在VS 2012中找到它(包括扩展和更新/ nuget对话框内).他们剪了它,还是我只是在错误的地方?

c# task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
348
查看次数

如何在 TPL 数据流中安排流量控制?

我试图在 TPL 数据流中控制数据流。我有一个非常快的生产者和一个非常慢的消费者。(我的实际代码更复杂,但无论如何,这是一个非常好的模型,它重现了问题。)

当我运行它时,代码开始消耗内存,就像它已经过时一样——并且生产者上的输出队列尽可能快地填满。我真正希望看到的是生产者停止运行一段时间,直到消费者有机会要求它。从我对文档的阅读来看,这就是应该发生的事情:也就是说,我认为生产者会等到消费者有空间。

事实并非如此,很明显。如何修复它以便队列不会发疯?

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;

namespace MemoryLeakTestCase
{
    class Program
    {

        static void Main(string[] args)
        {
            var CreateData = new TransformManyBlock<int, string>(ignore =>
            {
                return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
            });

            var ParseFile = new TransformManyBlock<string, string>(fileContent =>
            {
                Thread.Sleep(1000);
                return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
            }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
            );

            var EndOfTheLine = new ActionBlock<object>(f …
Run Code Online (Sandbox Code Playgroud)

c# asynchronous task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
1373
查看次数

如何在TPL Dataflow中分支逻辑?

我是TPL数据流的新手,请原谅我,如果这是一个简单的问题.

我有一个输入缓冲区块,它采用基类.如何根据派生类型从那里分支到块?例如:

var inputBlock = new BufferBlock<EventBase>();
//if EventBase is Meeting then go to block X
//if EventBase is Appointment the go to block Y
Run Code Online (Sandbox Code Playgroud)

谢谢!

.net c# task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
702
查看次数

"有界"BatchBlock => ActionBlock.如何完成正确的方法?

我正在尝试使用链接到操作块的有界批处理块.我知道批处理块中的项目进料结束时我想触发完成链.

问题是:如果我BatchBlock<T>是给定的,BoundedCapacity我将无法在动作块中触发所有项目.

这是我的问题的一个示例,它应该(在我对TPL数据流的理解......)中打印0到124,但最终打印0到99.

必须有一些我缺少的东西...也许BoundedCapacity意味着"当队列数超过xxx时丢弃项目......"如果是这样的话我怎样才能实现保证的最大内存消耗?

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            int itemsCount = 125;
            List<int> ints = new List<int>(itemsCount);
            for (int i = 0; i < itemsCount; i++)
                ints.Add(i);

            BatchBlock<int> batchBlock = new BatchBlock<int>(50,new GroupingDataflowBlockOptions(){BoundedCapacity = 100});
            ActionBlock<int[]> actionBlock = new ActionBlock<int[]>(intsBatch =>
            {
                Thread.Sleep(1000);
                foreach (int i in intsBatch)
                    Console.WriteLine(i);               
            });
            batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });

            // …
Run Code Online (Sandbox Code Playgroud)

.net c# task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
1592
查看次数

如何使用委托构造TransformManyBlock

我是C#TPL和DataFlow的新手,我正在努力研究如何实现TPL DataFlow TransformManyBlock.我正在将其他一些代码翻译成DataFlow.我的(简化)原始代码是这样的:

private IEnumerable<byte[]> ExtractFromByteStream(Byte[] byteStream)
{
    yield return byteStream; // Plus operations on the array
}
Run Code Online (Sandbox Code Playgroud)

在另一种方法中,我会这样称呼它:

foreach (byte[] myByteArray in ExtractFromByteStream(byteStream))
{
    // Do stuff with myByteArray
}
Run Code Online (Sandbox Code Playgroud)

我正在尝试创建一个TransformManyBlock来生成来自较大输入数组(实际上是二进制流)的多个小数组(实际上是数据包),因此in和out都是类型byte[].

我尝试了下面的内容,但我知道我错了.我想使用与以前相同的函数构造这个块,并且只是将TransformManyBlock它包裹起来.我收到一个错误"这个电话很模糊......"

var streamTransformManyBlock = new TransformManyBlock<byte[], byte[]>(ExtractFromByteStream);
Run Code Online (Sandbox Code Playgroud)

.net c# dataflow task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
1018
查看次数

在 tpl 数据流块中抛出的 OperationCanceledException 被吞噬

出于某种原因,当 anOperationCanceledException被抛出到 内时IDataflowBlock,块不会将此异常传播到其IDataflowBlock.Completion任务。运行下面的代码示例会返回一个意外的IDataflowBlock.Completion.Status == TaskStatus.RanToCompletion.

但是,如果块中抛出的异常类型更改为 an ArgumentNullException,则IDataflowBlock.Completion.Status更改TaskStatus.Faulted和 异常将保存在其InnerException属性中。

任何想法为什么OperationCanceledException被吞下?

[TestFixture]
public class TplDataBlockExceptionTest
{
    [Test]
    public void ShouldThrowException()
    {
        // Arrange
        var block = new TransformBlock<int, string>(i =>
        {
            throw new OperationCanceledException();
            return i.ToString();
        });

        // Act

        block.Post(1);
        block.Complete();

        try
        {
            block.Completion.Wait();
        }
        catch (Exception)
        {
            // ignored
        }

        // Assert

        Assert.That(block.Completion.IsFaulted);
    }
}
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
259
查看次数

是否有不接受输入但返回输出的 TPL 数据流块?

我的问题的标题说明了一切。

我正在寻找不需要输入的 TPL 数据流块。

现在我正在使用转换块,但它的输入未使用。

c# multithreading asynchronous task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
1114
查看次数

如何正确使用 ConcurrentQueue 中的块

我需要实现一个可以从多个线程填充的请求队列。当此队列大于 1000 个已完成的请求时,应将此请求存储到数据库中。这是我的实现:

public class RequestQueue
{
    private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
    private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();

    private static volatile bool isLoading = false;
    private static object _lock = new object();

    public static void Launch()
    {
        Task.Factory.StartNew(execute);
    }

    public static void Add(VerificationRequest request)
    {
        _queue.Add(request);
    }

    public static void AddRange(List<VerificationRequest> requests)
    {
        Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3},
            (request) => { _queue.Add(request); });
    }


    private static void execute()
    {
        Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest …
Run Code Online (Sandbox Code Playgroud)

c# multithreading producer-consumer task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
630
查看次数

如何用异常处理创建永无止境的DataFlow Mesh?

我正在创建一个使用TPL DataFlow的任务处理器。我将遵循生产者消费者模型,在生产者模型中,生产者偶尔会生产一些要处理的商品,而消费者则一直在等待新商品的到来。这是我的代码:

async Task Main()
{
    var runner = new Runner();
    CancellationTokenSource cts = new CancellationTokenSource();
    Task runnerTask = runner.ExecuteAsync(cts.Token);

    await Task.WhenAll(runnerTask);
}

public class Runner
{
    public async Task ExecuteAsync(CancellationToken cancellationToken) {
        var random = new Random();

        ActionMeshProcessor processor = new ActionMeshProcessor();
        await processor.Init(cancellationToken);

        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more

            int[] items = GetItems(random.Next(3, 7));

            await processor.ProcessBlockAsync(items);
        }
    }

    private int[] GetItems(int count)
    {
        Random randNum = new Random();

        int[] arr = new int[count];
        for …
Run Code Online (Sandbox Code Playgroud)

c# concurrency task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
332
查看次数