标签: tpl-dataflow

TPL Dataflow,我可以查询一个数据块是否标记为完成但尚未完成?

鉴于以下情况:

BufferBlock<int> sourceBlock = new BufferBlock<int>();
TransformBlock<int, int> targetBlock = new TransformBlock<int, int>(element =>
{
    return element * 2;
});

sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });

//feed some elements into the buffer block
for(int i = 1; i <= 1000000; i++)
{
    sourceBlock.SendAsync(i);
}

sourceBlock.Complete();

targetBlock.Completion.ContinueWith(_ =>
{
    //notify completion of the target block
});
Run Code Online (Sandbox Code Playgroud)

targetBlock似乎永远不会完成,我想原因是,在所有的项目TransformBlock targetBlock在输出队列中等待,因为我还没有链接的targetBlock其他任何数据流块。但是,我真正想要实现的是当(A)targetBlock通知完成和(B)输入队列为空时的通知。我不想关心项目是否仍然位于TransformBlock. 我该怎么办?是让我想查询的完成状态什么的唯一方式sourceBlock,并确保了InputCounttargetBlock是零?我不确定这是否非常稳定(sourceBlock如果中的最后一项sourceBlock已传递给 …

c# concurrency message-passing task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
1548
查看次数

TPL 数据流,JoinBlock 限制的替代方案?

我寻找 JoinBlock 的替代方案,它可以通过 n-TransformBlocks 链接,并将所有 TransformBlock 源块的消息连接/合并在一起,以便将此类的集合传递给另一个数据流块。

JoinBlock 可以很好地完成工作,但仅限于连接 3 个源块。它还存在许多效率低下的问题(连接 2 个源块的偶数值类型(整数)非常慢)。有没有办法让任务从 TransformBlocks 返回并等到所有 TransformBlocks 都有一个完成的任务才能接受Task<item>

任何替代想法?我可能有 1-20 个这样的转换块,在传递连接的项目集合之前,我需要将哪些项目连接在一起。每个转换块都保证为每个“转换”的输入项准确返回一个输出项。

编辑:要求澄清:

根据我之前的一个问题,我按如下方式设置了我的 JoinBlock:

public Test()
{
    broadCastBlock = new BroadcastBlock<int>(i =>
        {
            return i;
        });

    transformBlock1 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    transformBlock2 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    joinBlock = new JoinBlock<int, int>();

    processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
        {
            //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
        });

    //Linking …
Run Code Online (Sandbox Code Playgroud)

c# concurrency actor-model task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
2681
查看次数

TPL 数据流:在保持秩序的同时设计并行性

我以前从未使用过 TPL,所以我想知道是否可以用它来完成:我的应用程序从很多帧创建了一个 gif 图像动画文件。我从一个表示 gif 文件帧的 Bitmap 列表开始,需要对每一帧执行以下操作:

  1. 在框架上绘制一些文本/位图
  2. 裁剪框架
  3. 调整框架大小
  4. 将图像减少到 256 色

显然,这个过程可以对列表中的所有帧并行完成,但对于每个帧,步骤的顺序需要相同。之后,我需要将所有帧写入 gif 文件。因此,所有帧都需要按照它们在原始列表中的相同顺序接收。最重要的是,这个过程可以在第一帧准备好时开始,不需要等到所有帧都处理完。

所以情况就是这样。TPL Dataflow 适合这个吗?如果是的话,谁能给我一个关于如何设计 tpl 块结构以反映上述过程的正确方向的提示?与我发现的一些样本相比,这对我来说似乎相当复杂。

c# asynchronous dataflow task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
1374
查看次数

如何标记一个 TPL 数据流周期完成?

鉴于 TPL 数据流中的以下设置。

var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");

var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);

var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
    return directory.GetDirectories();

});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
    return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);

var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
    var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
    return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
    XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
        (z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
    () => new TransformBlock<string, string>((s) =>
    {
        Trace.TraceInformation("Combining {0}", s);
        return s;
    })); …
Run Code Online (Sandbox Code Playgroud)

.net c# task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
731
查看次数

TPL 数据流块永远不会在 PropagateCompletion 上完成

自从对我的传播完成管道进行了最后一次更改后,我的一个缓冲区块从未完成。让我总结一下什么是有效的,什么不再是:

以前工作:

A.LinkTo(B, PropagateCompletion);
B.LinkTo(C, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);
D.Receive();

// everything completes
Run Code Online (Sandbox Code Playgroud)

不再工作:

A.LinkTo(B, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);

await A.Completion;
someWriteOnceBlock.Post(B.Count);
// B.Complete(); commented on purpose
B.LinkTo(C, PropagateCompletion);

D.Receive();

// Only A reaches completion
// B remains in 'waiting for activation'
// C executes but obviously never completes since B doesn't either
Run Code Online (Sandbox Code Playgroud)

如果我取消注释注释行,一切正常,但显然该行不是必需的。

不知何故,我的 BufferBlock B 永远不会完成,即使链接它的块已完成并传播其完成,并且它链接的块接收所有缓冲项。

.net c# dataflow task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
795
查看次数

.NET 排队任务(使用 async/await)

我有大量任务(~1000)需要执行。我在 4 核处理器上运行,所以我想一次并行处理 4 个任务。

为了给您一个起点,这里有一些示例代码。

class Program
{
    public class LongOperation
    {
        private static readonly Random RandomNumberGenerator = new Random(0);
        const int UpdateFrequencyMilliseconds = 100;

        public int CurrentProgress { get; set; }

        public int TargetProcess { get; set; }

        public LongOperation()
        {
            TargetProcess = RandomNumberGenerator.Next(
                (int)TimeSpan.FromSeconds(5).TotalMilliseconds / UpdateFrequencyMilliseconds, 
                (int)TimeSpan.FromSeconds(10).TotalMilliseconds / UpdateFrequencyMilliseconds);
        }

        public async Task Execute()
        {
            while (!IsCompleted)
            {
                await Task.Delay(UpdateFrequencyMilliseconds);
                CurrentProgress++;
            }
        }

        public bool IsCompleted => CurrentProgress >= TargetProcess;
    }

    static void Main(string[] args)
    {
        Task.Factory.StartNew(async …
Run Code Online (Sandbox Code Playgroud)

.net task-parallel-library async-await tpl-dataflow

5
推荐指数
1
解决办法
137
查看次数

将单个结果与多个其他结果连接的数据块

在我的应用程序中,我想将多个字符串与替换值字典连接起来。

readTemplateBlock获取与FileInfos并返回其作为字符串内容喂养。
getReplacersBlock被供给(一次)与单个代用品字典。
joinTemplateAndReplacersBlock应参加的每个项目readTemplateBlock与一个getReplacersBlock结果。

在我当前的设置中,它要求我为我发布的每个文件再次发布相同的替换字典。

// Build
var readTemplateBlock = new TransformBlock<FileInfo, string>(file => File.ReadAllText(file.FullName));
var getReplacersBlock = new WriteOnceBlock<IDictionary<string, string>>(null);
var joinTemplateAndReplacersBlock = new JoinBlock<string, IDictionary<string, string>>();

// Assemble
var propagateComplete = new DataflowLinkOptions {PropagateCompletion = true};

readTemplateBlock.LinkTo(joinTemplateAndReplacersBlock.Target1, propagateComplete);
getReplacersBlock.LinkTo(joinTemplateAndReplacersBlock.Target2, propagateComplete);
joinTemplateAndReplacersBlock.LinkTo(replaceTemplateBlock, propagateComplete);

// Post
foreach (var template in templateFilenames)
{
    getFileBlock.Post(template);
}
getFileBlock.Complete();

getReplacersBlock.Post(replacers);
getReplacersBlock.Complete();
Run Code Online (Sandbox Code Playgroud)

有没有更好的块我错过了?也许我忽略了一个配置选项?

.net c# tpl-dataflow

5
推荐指数
1
解决办法
232
查看次数

为什么 BufferBlock&lt;T&gt;.ReceiveAsync() 在数据可用时挂起?

我是 TPL 数据流的新手。

我正在尝试为相当快速的输入流构建一个节流异步更新。BufferBlock 似乎很适合这个想法,因为我可以调用 ReceiveAll() 来从缓冲区中获取所有内容,并且在某些情况下,我无法在 ReceiveAsync() 上等待以在它到达时拾取下一个元素。

但它似乎有时会挂在 ReceiveAsync() 调用上;并且失败的条件很奇怪。

请注意,我对为什么会挂起很感兴趣。我已经找到了另一种使我正在处理的应用程序工作的方法,但它可能不像我没有使用 TPL Dataflow 那样简洁或可扩展,因为我显然不了解它是如何工作的。

进一步说明这里的关键用法是我执行TryReceiveAll()然后等待 ReceiveAsync()如果失败。这是突发数据到达的常见模式,我想将数据作为批处理进行处理。这就是我不想只在ReceiveAsync()上循环的原因,因此为什么直接挂钩ActionBlockTransformBlock是行不通的。如果我删除TryReceiveAll()我的版本似乎按预期工作;不过,正如其他评论所指出的那样,对于不同的人来说似乎是不同的,所以这可能是巧合。

这是一个失败的示例...将其放入引用和使用System.Threading.Tasks.Dataflow.dll的控制台应用程序中:

using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
Run Code Online (Sandbox Code Playgroud)

失败的例子:

class Program
{
    static void Main(string[] args)
    {
        var context = new CancellationTokenSource();
        var buffer = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = context.Token });
        var task = Task.Run(() =>ProcessBuffer(buffer, context.Token), context.Token);

        // shove 10 things onto the buffer …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library tpl-dataflow

5
推荐指数
0
解决办法
1292
查看次数

TPL Dataflow BufferBlock 线程安全吗?

我有一个相当简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产由一个消费者消费的输出。

为此,我使用 System.Threading.Tasks.Dataflow.BufferBlock<T>

一个BufferBlock对象被创建。一个Consumer是听这个BufferBlock,并处理任何接收到的输入。

send data to the同时有两个“生产者BufferBlock”

简化:

BufferBlock<int> bufferBlock = new BufferBlock<int>();

async Task Consume()
{
    while(await bufferBlock.OutputAvailable())
    {
         int dataToProcess = await outputAvailable.ReceiveAsync();
         Process(dataToProcess);
    }
}

async Task Produce1()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for this example
    }
}

async Task Produce2()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for …
Run Code Online (Sandbox Code Playgroud)

c# dataflow producer-consumer task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
2198
查看次数

使用 HttpClient 和 Polly 发送并行请求,但每个主机只有一个,以优雅地处理 429 响应

介绍:

我正在构建一个单节点网络爬虫来简单地验证200 OK.NET Core 控制台应用程序中的URL 。我在不同的主机上有一组 URL,我用HttpClient. 我对使用 Polly 和 TPL Dataflow 还很陌生。

要求:

  1. 我想支持与可配置的MaxDegreeOfParallelism.
  2. 我想将任何给定主机的并行请求数限制为 1(或可配置)。这是为了429 TooManyRequests使用 Polly 策略优雅地处理每个主机的响应。或者,我可以使用断路器在收到一个429响应时取消对同一主机的并发请求,然后一次一个地处理该特定主机?
  3. 我完全没有使用 TPL 数据流,而是使用 Polly Bulkhead 或其他一些机制来限制并行请求,但我不确定为了实现需求 #2 的配置会是什么样子。

当前实施:

我当前的实现是有效的,除了我经常看到x对同一主机的并行请求429大约在同一时间返回......然后,他们都暂停重试策略......然后,他们都猛烈抨击同一台主机再次同时经常仍然收到429s。即使我在整个队列中均匀分布同一主机的多个实例,我的 URL 集合也会因一些429最终仍开始生成s 的特定主机而超重。

收到 a 后429,我想我只想向该主机发送一个并发请求,以尊重远程主机并追求200s。

验证器方法:

public async Task<int> GetValidCount(IEnumerable<Uri> urls, CancellationToken cancellationToken)
{
    var validator = new TransformBlock<Uri, bool>(
        async u => (await _httpClient.GetAsync(u, HttpCompletionOption.ResponseHeadersRead, cancellationToken)).IsSuccessStatusCode, …
Run Code Online (Sandbox Code Playgroud)

c# web-crawler tpl-dataflow .net-core polly

5
推荐指数
1
解决办法
697
查看次数