标签: tpl-dataflow

如何并行处理项目然后合并结果?

我面临以下问题:

我有一个Foo对象数据流,并将这些对象流式传输到几个并发的进程内任务/线程,这些任务/线程依次处理对象和输出FooResult对象。每个都FooResult包含在其他成员中与Foo在创建FooResult. 但是,并非每个人都Foo必须创建一个FooResult.

我的问题是,我想从整个过程传递一个包装对象,该对象包含原始对象Foo和可能FooResultFoo并发任务中创建的所有对象(如果有)。

注意:我目前使用 TPL 数据流,而每个并发进程都发生在ActionBlock<Foo>BroadCastBlock<Foo>. 它使用SendAsync()目标数据流块来发送可能创建的FooResult. 显然,并发数据流块FooResult在不可预测的时间产生,这正是我目前所面临的问题。我似乎无法弄清楚FooResult总共创建了多少个,ActionBlock<Foo>以便我可以将它们与原始Foo对象捆绑在一起并将其作为包装对象传递。

在伪代码中,它目前如下所示:

BroadCastBlock<Foo> broadCastBlock;
ActionBlock<Foo> aBlock1;
ActionBlock<Foo> aBlock2; 
ActionBlock<FooResult> targetBlock;
broadCastBlock.LinkTo(aBlock1); broadCastBlock.LinkTo(aBlock2);

aBlock1 = new ActionBlock<Foo>(foo =>
{
    //do something here. Sometimes create a FooResult. If then
    targetBlock.SendAsync(fooResult);
});

//similar for aBlock2
Run Code Online (Sandbox Code Playgroud)

但是,当前代码的问题在于,如果在任何操作块中都Foo没有生成单个,则 targetBlock …

c# concurrency merge task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
1910
查看次数

使用TPL Dataflow创建消息总线

我正在寻找一个轻量级,进程中,异步消息总线,并遇到了TPL Dataflow.

我目前的实现如下(https://gist.github.com/4416655上的完整示例).

public class Bus
{
    private readonly BroadcastBlock<object> broadcast =
        new BroadcastBlock<object>(message => message);

    private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
        = new ConcurrentDictionary<Guid, IDisposable>();

    public Task SendAsync<TMessage>(TMessage message)
    {
        return SendAsync<TMessage>(message, CancellationToken.None);
    }

    public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
    {
        return broadcast.SendAsync(message, cancellationToken);
    }

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
    {
        var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));

        var subscription = broadcast.LinkTo(handler, 
            new DataflowLinkOptions { PropagateCompletion = true }, 
            message => message is TMessage);

        return AddSubscription(subscription);
    }

    public void …
Run Code Online (Sandbox Code Playgroud)

.net task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
3752
查看次数

使用transformblocks的糟糕表现

我目前正在尝试使用TransformBlocks来使我的代码运行得更快.相反,我发现我基本上没有实现并行化:

正如您所看到的,存在相当多的死空间,很少有I/O或其他问题阻止事物并行运行(注意:所有绿色块都是主线程).

调用代码的基本结构如下:

var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 };
var download = new TransformBlock<string, Tuple<string, string>>(s => sendAndReciveRequest(s), options);
var process = new TransformBlock<Tuple<string, string, TransformBlock<string, Tuple<string, string>>>, List<string>>(s => Helpers.ParseKBDL(s), options);
var toObjects = new TransformBlock<List<string>, List<Food>>(list => toFood(list), options);

for (char char1 = 'a'; char1 < 'z' + 1; char1++)
    download.Post(char1.ToString());

while ((download.InputCount != 0 || download.OutputCount != 0 || process.InputCount != 0) || (Form1.downloadCount != Form1.processCount))
{
    if (download.OutputCount == 0 && download.InputCount …
Run Code Online (Sandbox Code Playgroud)

concurrency performance task-parallel-library c#-5.0 tpl-dataflow

5
推荐指数
0
解决办法
507
查看次数

如何使用TPL数据流库指定无序执行块?

我想设置一个TransformBlock并行处理它的项目.因此,我设置ExecutionDataflowBlockOptions.MaxDegreeOfParallelism为> 1.我不关心消息的顺序,但文档说:

如果指定的最大并行度大于1,则会同时处理多个消息,因此,可能无法按接收顺序处理消息.但是,从块输出消息的顺序将被正确排序.

"正确排序"是否意味着如果队列中有一条消息需要很长的处理时间,则在处理这一条消息之前不会输出更多消息?

如果是这样,我如何指定一个TransformBlock不关心排序的执行块(例如a )?或者我必须在消费端指定我不关心订购?

c# task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
605
查看次数

多个短期TPL数据流与单个长期运行流

我正在使用TPL数据流来处理Azure辅助角色中的队列中的项目.我应该有一个长时间运行的数据流,还是为我收到的每封邮件生成一个新流?

如果块中抛出错误,该块将停止接受新消息.这意味着如果块中存在异常,则整个数据流将停止处理.

我需要能够在不锁定数据流的情况下承受无效队列输入等异常.我看到两个选项之一:

  1. 我有一个启动单个数据流并在它们离开队列时向它发送消息.每个块的内容都包含在try-catch块中,该块记录异常,然后继续处理.这看起来很笨拙,我认为有更好的方法.
  2. 对于每条消息,我启动一个新的数据流并处理队列消息.如果在任何块中抛出异常,数据流将完成,我只恢复单个消息.我见过的大多数数据流示例都发送了多条消息,所以这也感觉不对.

我已经看到很多关于如何在异常后完成数据流的文档,但很少有关于如何从异常中恢复的文档.

.net c# task-parallel-library async-await tpl-dataflow

5
推荐指数
1
解决办法
700
查看次数

ITargetBlock的最佳实践<TInput> .Completion.ContinueWith()

这个问题是关于ContinueWith()用于处理TPL数据块完成时的最佳实践.

ITargetBlock<TInput>.Completion()方法允许您使用异步处理数据块的完成ContinueWith().

请考虑以下Console应用程序代码,它演示了一个非常基本的用法

private static void Main()
{
    test().Wait();
}

static async Task test()
{
    var transform = new TransformBlock<int, double>(i => i/2.0);
    var output    = new ActionBlock<double>(d => Console.WriteLine(d));

    // Warning CS4014 here:
    transform.Completion.ContinueWith(continuation => output.Complete());
    transform.LinkTo(output);

    for (int i = 0; i < 10; ++i)
        await transform.SendAsync(i);

    transform.Complete();
    await output.Completion;
}
Run Code Online (Sandbox Code Playgroud)

代码有一个非常简单的方法TransformBlock,它将整数除以2.0并将它们转换为双精度数.转换的数据由ActionBlock只处理控制台窗口的值处理.

输出是:

0
0.5
1
1.5
2
2.5
3
3.5
4
4.5
Run Code Online (Sandbox Code Playgroud)

TransformBlock完成后,我也想完成ActionBlock.这样做很方便: …

.net c# task-parallel-library async-await tpl-dataflow

5
推荐指数
1
解决办法
1048
查看次数

ActionBlock &lt;T&gt;与Task.WhenAll

我想知道并行执行多个异步方法的推荐方法是什么?

在System.Threading.Tasks.Dataflow中,我们可以指定最大并行度,但Unbounded可能是Task.WhenAll的默认值?

这个 :

var tasks = new List<Task>();
foreach(var item in items)
{
    tasks.Add(myAsyncMethod(item));
}
await Task.WhenAll(tasks.ToArray());
Run Code Online (Sandbox Code Playgroud)

或者那个 :

var action = new ActionBlock<string>(myAsyncMethod, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
            BoundedCapacity = DataflowBlockOptions.Unbounded,
            MaxMessagesPerTask = DataflowBlockOptions.Unbounded
        });
foreach (var item in items) { }
{
     action.Post(item);
}
action.Complete();

await action.Completion;
Run Code Online (Sandbox Code Playgroud)

c# asynchronous task-parallel-library async-await tpl-dataflow

5
推荐指数
1
解决办法
1405
查看次数

TPL-定义的ExecutionDataflowBlockOptions BoundedCapacity降低了性能

有什么方法可以通过TPL节流来限制性能下降吗?

我有一个复杂的组件管道,并试图限制所需的内存需求。我从多个文件中并行读取,管道中的组件可能会从这些文件的随机部分中读取一些内容,其余组件则进行CPU绑定操作。

我使用通用测试方法将性能测试平台简化为这些测试。

private void TPLPerformaceTest(int generateNumbers, ExecutionDataflowBlockOptions transformBlockOptions)
{
    var transformBlock = new TransformBlock<int, int>(i => i, transformBlockOptions);

    var storedCount = 0;
    var generatedCount = 0;
    var store = new ActionBlock<int>(i => Interlocked.Increment(ref storedCount));

    transformBlock.LinkTo(store);
    transformBlock.Completion.ContinueWith(_ => store.Complete());

    for (int i = 0; i < generateNumbers; i++)
    {
        transformBlock.SendAsync(i).Wait(); //To ensure delivery
        Interlocked.Increment(ref generatedCount);
    }
    transformBlock.Complete();
    store.Completion.Wait();

    Assert.IsTrue(generatedCount == generateNumbers);
    Assert.IsTrue(storedCount == generateNumbers);
}
Run Code Online (Sandbox Code Playgroud)

第一个没有节流。在我的CPU上,大约需要12秒钟才能完成,消耗约800MB的RAM,平均CPU利用率约为35%

[Test]
public void TPLPerformaceUnlimitedTest()
{
    var …
Run Code Online (Sandbox Code Playgroud)

c# parallel-processing performance performance-testing tpl-dataflow

5
推荐指数
1
解决办法
908
查看次数

使用AsObservable观察TPL数据流块而不消耗消息

我有一连串的TPL Dataflow块,并希望观察系统内部的进度。

我知道我可以将a TransformBlock塞入要观察的网格中,将其发布到各种进度更新器中,然后将消息原封不动地返回到下一个块。我不喜欢这种解决方案,因为该块纯粹是因为它的副作用而存在,而且我还必须在我想观察的任何地方更改块链接逻辑。

因此,我想知道是否可以ISourceBlock<T>.AsObservable用来观察网格内消息的传递而无需更改它,也无需消耗消息。如果可行的话,这似乎是一种更纯净,更实用的解决方案。

从我对Rx的(有限的)理解中,这意味着我需要可观察的对象是热的而不是冷的,以便我的progress更新程序可以看到该消息但不使用它。并且.Publish().RefCount()似乎是使可观察到的热点的方法。但是,它根本不起作用按预期-而不是要么block2progress接收并消耗每条消息。

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); 
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call. …
Run Code Online (Sandbox Code Playgroud)

.net c# system.reactive tpl-dataflow rx.net

5
推荐指数
2
解决办法
808
查看次数

使用反应式编程写入打开FileStream

我正在编写一个小的记录器,我想打开一次日志文件,在日志消息到达时继续写入,并在程序终止时处理所有内容.

我不确定如何保持FileStream打开在消息到达时反应性地写入消息.

我想从我以前的解决方案更新设计,其中我有一个ConcurrentQueue充当缓冲区,并且在using消耗队列的语句中有一个循环.

具体来说,我想同时利用using语句构造,所以我不必显式关闭流和编写器,以及反应式,无循环编程风格.目前我只知道如何同时使用这些结构之一:using/ loop组合,或显式流关闭/反应组合.

这是我的代码:

    BufferBlock<LogEntry> _buffer = new BufferBlock<LogEntry>();


    // CONSTRUCTOR
    public DefaultLogger(string folder)
    {
        var filePath = Path.Combine(folder, $"{DateTime.Now.ToString("yyyy.MM.dd")}.log");

        _cancellation = new CancellationTokenSource();

        var observable = _buffer.AsObservable();

        using (var stream = File.Create(_filePath))
        using (var writer = new StreamWriter(stream))
        using (var subscription = observable.Subscribe(entry =>
                                    writer.Write(GetFormattedString(entry))))
        {
            while (!_cancellation.IsCancellationRequested)
            {
                // what do I do here?
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

c# filestream system.reactive tpl-dataflow

5
推荐指数
1
解决办法
364
查看次数