标签: tpl-dataflow

TPL 数据流,JoinBlock 限制的替代方案?

我寻找 JoinBlock 的替代方案,它可以通过 n-TransformBlocks 链接,并将所有 TransformBlock 源块的消息连接/合并在一起,以便将此类的集合传递给另一个数据流块。

JoinBlock 可以很好地完成工作,但仅限于连接 3 个源块。它还存在许多效率低下的问题(连接 2 个源块的偶数值类型(整数)非常慢)。有没有办法让任务从 TransformBlocks 返回并等到所有 TransformBlocks 都有一个完成的任务才能接受Task<item>

任何替代想法?我可能有 1-20 个这样的转换块,在传递连接的项目集合之前,我需要将哪些项目连接在一起。每个转换块都保证为每个“转换”的输入项准确返回一个输出项。

编辑:要求澄清:

根据我之前的一个问题,我按如下方式设置了我的 JoinBlock:

public Test()
{
    broadCastBlock = new BroadcastBlock<int>(i =>
        {
            return i;
        });

    transformBlock1 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    transformBlock2 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    joinBlock = new JoinBlock<int, int>();

    processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
        {
            //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
        });

    //Linking …
Run Code Online (Sandbox Code Playgroud)

c# concurrency actor-model task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
2681
查看次数

TPL 数据流:在保持秩序的同时设计并行性

我以前从未使用过 TPL,所以我想知道是否可以用它来完成:我的应用程序从很多帧创建了一个 gif 图像动画文件。我从一个表示 gif 文件帧的 Bitmap 列表开始,需要对每一帧执行以下操作:

  1. 在框架上绘制一些文本/位图
  2. 裁剪框架
  3. 调整框架大小
  4. 将图像减少到 256 色

显然,这个过程可以对列表中的所有帧并行完成,但对于每个帧,步骤的顺序需要相同。之后,我需要将所有帧写入 gif 文件。因此,所有帧都需要按照它们在原始列表中的相同顺序接收。最重要的是,这个过程可以在第一帧准备好时开始,不需要等到所有帧都处理完。

所以情况就是这样。TPL Dataflow 适合这个吗?如果是的话,谁能给我一个关于如何设计 tpl 块结构以反映上述过程的正确方向的提示?与我发现的一些样本相比,这对我来说似乎相当复杂。

c# asynchronous dataflow task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
1374
查看次数

使用transformblocks的糟糕表现

我目前正在尝试使用TransformBlocks来使我的代码运行得更快.相反,我发现我基本上没有实现并行化:

正如您所看到的,存在相当多的死空间,很少有I/O或其他问题阻止事物并行运行(注意:所有绿色块都是主线程).

调用代码的基本结构如下:

var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 };
var download = new TransformBlock<string, Tuple<string, string>>(s => sendAndReciveRequest(s), options);
var process = new TransformBlock<Tuple<string, string, TransformBlock<string, Tuple<string, string>>>, List<string>>(s => Helpers.ParseKBDL(s), options);
var toObjects = new TransformBlock<List<string>, List<Food>>(list => toFood(list), options);

for (char char1 = 'a'; char1 < 'z' + 1; char1++)
    download.Post(char1.ToString());

while ((download.InputCount != 0 || download.OutputCount != 0 || process.InputCount != 0) || (Form1.downloadCount != Form1.processCount))
{
    if (download.OutputCount == 0 && download.InputCount …
Run Code Online (Sandbox Code Playgroud)

concurrency performance task-parallel-library c#-5.0 tpl-dataflow

5
推荐指数
0
解决办法
507
查看次数

一起使用 BlockingCollection&lt;T&gt; 和 TPL 数据流时出现死锁

我已经编写了一个复制该问题的示例测试。这不是我的实际代码,我尝试编写一个小代码。如果将边界容量增加到迭代次数,从而有效地使其没有边界,则不会出现死锁,如果将最大并行度设置为较小的数字(例如 1),则不会出现死锁。

再说一次,我知道下面的代码不是很好,但我实际发现的代码要大得多并且难以理解。基本上有一个与远程资源的连接的阻塞对象池,并且流中的几个块使用了该连接。

关于如何解决这个问题有什么想法吗?乍一看,这似乎是数据流的问题。当我中断查看线程时,我看到许多线程在 Add 上被阻塞,0 个线程在 take 上被阻塞。addBlocks 出站队列中有几个项目尚未传播到 takeblock,因此它被卡住或死锁。

    var blockingCollection = new BlockingCollection<int>(10000);

    var takeBlock = new ActionBlock<int>((i) =>
    {
        int j = blockingCollection.Take();

    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20,
              SingleProducerConstrained = true
           });

    var addBlock = new TransformBlock<int, int>((i) => 
    {
        blockingCollection.Add(i);
        return i;

    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20
           });

    addBlock.LinkTo(takeBlock, new DataflowLinkOptions()
          {
             PropagateCompletion = true
          });

    for (int i = 0; i < 100000; i++)
    {
        addBlock.Post(i);
    }

    addBlock.Complete();
    await addBlock.Completion; …
Run Code Online (Sandbox Code Playgroud)

c# multithreading task-parallel-library blockingcollection tpl-dataflow

5
推荐指数
1
解决办法
877
查看次数

如何标记一个 TPL 数据流周期完成?

鉴于 TPL 数据流中的以下设置。

var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");

var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);

var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
    return directory.GetDirectories();

});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
    return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);

var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
    var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
    return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
    XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
        (z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
    () => new TransformBlock<string, string>((s) =>
    {
        Trace.TraceInformation("Combining {0}", s);
        return s;
    })); …
Run Code Online (Sandbox Code Playgroud)

.net c# task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
731
查看次数

多次调用 GetStringAsync 的更有效方法?

我有(我的网址列表大约有 1000 个网址),我想知道是否有更有效的方法从同一站点调用多个网址(已经更改了ServicePointManager.DefaultConnectionLimit)。

另外,是在每次调用时重用相同的HttpClient还是创建新的更好,下面仅使用一个而不是多个。

using (var client = new HttpClient { Timeout = new TimeSpan(0, 5, 0) })
{
    var tasks = urls.Select(async url =>
    {
        await client.GetStringAsync(url).ContinueWith(response =>
        {
           var resultHtml = response.Result;
           //process the html

        });
    }).ToList();

    Task.WaitAll(tasks.ToArray());
}
Run Code Online (Sandbox Code Playgroud)

正如@cory建议的,
这里是使用的修改后的代码TPL,但是我必须设置MaxDegreeOfParallelism = 100以达到与基于任务的速度大致相同的速度,下面的代码可以改进吗?

var downloader = new ActionBlock<string>(async url =>
{
    var client = new WebClient();
    var resultHtml = await client.DownloadStringTaskAsync(new Uri(url));


}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 }); …
Run Code Online (Sandbox Code Playgroud)

c# multithreading task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
1391
查看次数

为什么 BufferBlock&lt;T&gt;.ReceiveAsync() 在数据可用时挂起?

我是 TPL 数据流的新手。

我正在尝试为相当快速的输入流构建一个节流异步更新。BufferBlock 似乎很适合这个想法,因为我可以调用 ReceiveAll() 来从缓冲区中获取所有内容,并且在某些情况下,我无法在 ReceiveAsync() 上等待以在它到达时拾取下一个元素。

但它似乎有时会挂在 ReceiveAsync() 调用上;并且失败的条件很奇怪。

请注意,我对为什么会挂起很感兴趣。我已经找到了另一种使我正在处理的应用程序工作的方法,但它可能不像我没有使用 TPL Dataflow 那样简洁或可扩展,因为我显然不了解它是如何工作的。

进一步说明这里的关键用法是我执行TryReceiveAll()然后等待 ReceiveAsync()如果失败。这是突发数据到达的常见模式,我想将数据作为批处理进行处理。这就是我不想只在ReceiveAsync()上循环的原因,因此为什么直接挂钩ActionBlockTransformBlock是行不通的。如果我删除TryReceiveAll()我的版本似乎按预期工作;不过,正如其他评论所指出的那样,对于不同的人来说似乎是不同的,所以这可能是巧合。

这是一个失败的示例...将其放入引用和使用System.Threading.Tasks.Dataflow.dll的控制台应用程序中:

using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
Run Code Online (Sandbox Code Playgroud)

失败的例子:

class Program
{
    static void Main(string[] args)
    {
        var context = new CancellationTokenSource();
        var buffer = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = context.Token });
        var task = Task.Run(() =>ProcessBuffer(buffer, context.Token), context.Token);

        // shove 10 things onto the buffer …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library tpl-dataflow

5
推荐指数
0
解决办法
1292
查看次数

使用 HttpClient 和 Polly 发送并行请求,但每个主机只有一个,以优雅地处理 429 响应

介绍:

我正在构建一个单节点网络爬虫来简单地验证200 OK.NET Core 控制台应用程序中的URL 。我在不同的主机上有一组 URL,我用HttpClient. 我对使用 Polly 和 TPL Dataflow 还很陌生。

要求:

  1. 我想支持与可配置的MaxDegreeOfParallelism.
  2. 我想将任何给定主机的并行请求数限制为 1(或可配置)。这是为了429 TooManyRequests使用 Polly 策略优雅地处理每个主机的响应。或者,我可以使用断路器在收到一个429响应时取消对同一主机的并发请求,然后一次一个地处理该特定主机?
  3. 我完全没有使用 TPL 数据流,而是使用 Polly Bulkhead 或其他一些机制来限制并行请求,但我不确定为了实现需求 #2 的配置会是什么样子。

当前实施:

我当前的实现是有效的,除了我经常看到x对同一主机的并行请求429大约在同一时间返回......然后,他们都暂停重试策略......然后,他们都猛烈抨击同一台主机再次同时经常仍然收到429s。即使我在整个队列中均匀分布同一主机的多个实例,我的 URL 集合也会因一些429最终仍开始生成s 的特定主机而超重。

收到 a 后429,我想我只想向该主机发送一个并发请求,以尊重远程主机并追求200s。

验证器方法:

public async Task<int> GetValidCount(IEnumerable<Uri> urls, CancellationToken cancellationToken)
{
    var validator = new TransformBlock<Uri, bool>(
        async u => (await _httpClient.GetAsync(u, HttpCompletionOption.ResponseHeadersRead, cancellationToken)).IsSuccessStatusCode, …
Run Code Online (Sandbox Code Playgroud)

c# web-crawler tpl-dataflow .net-core polly

5
推荐指数
1
解决办法
697
查看次数

TPL 数据流的 AsyncLocal 值不正确

考虑这个例子:

class Program

{
    private static readonly ITargetBlock<string> Mesh = CreateMesh();
    private static readonly AsyncLocal<string> AsyncLocalContext
        = new AsyncLocal<string>();

    static async Task Main(string[] args)
    {
        var tasks = Enumerable.Range(1, 4)
            .Select(ProcessMessage);
        await Task.WhenAll(tasks);

        Mesh.Complete();
        await Mesh.Completion;

        Console.WriteLine();
        Console.WriteLine("Done");
    }

    private static async Task ProcessMessage(int number)
    {
        var param = number.ToString();
        using (SetScopedAsyncLocal(param))
        {
            Console.WriteLine($"Before send {param}");
            await Mesh.SendAsync(param);
            Console.WriteLine($"After send {param}");
        }
    }

    private static IDisposable SetScopedAsyncLocal(string value)
    {
        AsyncLocalContext.Value = value;

        return new Disposer(() => AsyncLocalContext.Value = null);
    } …
Run Code Online (Sandbox Code Playgroud)

.net c# tpl-dataflow .net-core

5
推荐指数
1
解决办法
445
查看次数

取消TPL数据流块的正确方法

我正在使用TPL块来执行可能被用户取消的操作:我提出了两个选项,首先我取消整个块但不取消块内的操作,如下所示:

_downloadCts = new CancellationTokenSource();

var processBlockV1 = new TransformBlock<int, List<int>>(construct =>
{
    List<int> properties = GetPropertiesMethod(construct );
    var entities = properties
        .AsParallel()
        .Select(DoSometheningWithData)
        .ToList();
    return entities;
}, new ExecutionDataflowBlockOptions() { CancellationToken = _downloadCts.Token });
Run Code Online (Sandbox Code Playgroud)

第二个我取消内部操作,但不是块本身:

var processBlockV2 = new TransformBlock<int, List<int>>(construct =>
{
    List<int> properties = GetPropertiesMethod(construct);
    var entities = properties
        .AsParallel().WithCancellation(_downloadCts.Token)
        .Select(DoSometheningWithData)
        .ToList();
    return entities;
});
Run Code Online (Sandbox Code Playgroud)

据我了解,第一个选项将取消整个块,从而关闭整个管道。我的问题是它是否也会取消内部操作并处理所有资源(如果有)(打开 StreamReaders 等),或者最好选择第二个选项,然后我自己可以确保所有内容都被取消和清理,然后我可以使用一些方法(铁路编程)漂浮OperationCanceledException在管道上并在我想要的地方处理它?

c# task-parallel-library cancellationtokensource tpl-dataflow

5
推荐指数
0
解决办法
156
查看次数