标签: tpl-dataflow

一起使用 BlockingCollection<T> 和 TPL 数据流时出现死锁

我已经编写了一个复制该问题的示例测试。这不是我的实际代码,我尝试编写一个小代码。如果将边界容量增加到迭代次数,从而有效地使其没有边界,则不会出现死锁,如果将最大并行度设置为较小的数字(例如 1),则不会出现死锁。

再说一次,我知道下面的代码不是很好,但我实际发现的代码要大得多并且难以理解。基本上有一个与远程资源的连接的阻塞对象池,并且流中的几个块使用了该连接。

关于如何解决这个问题有什么想法吗?乍一看,这似乎是数据流的问题。当我中断查看线程时,我看到许多线程在 Add 上被阻塞,0 个线程在 take 上被阻塞。addBlocks 出站队列中有几个项目尚未传播到 takeblock,因此它被卡住或死锁。

    var blockingCollection = new BlockingCollection<int>(10000);

    var takeBlock = new ActionBlock<int>((i) =>
    {
        int j = blockingCollection.Take();

    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20,
              SingleProducerConstrained = true
           });

    var addBlock = new TransformBlock<int, int>((i) => 
    {
        blockingCollection.Add(i);
        return i;

    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20
           });

    addBlock.LinkTo(takeBlock, new DataflowLinkOptions()
          {
             PropagateCompletion = true
          });

    for (int i = 0; i < 100000; i++)
    {
        addBlock.Post(i);
    }

    addBlock.Complete();
    await addBlock.Completion; …
Run Code Online (Sandbox Code Playgroud)

c# multithreading task-parallel-library blockingcollection tpl-dataflow

5
推荐指数
1
解决办法
877
查看次数

多次调用 GetStringAsync 的更有效方法?

我有(我的网址列表大约有 1000 个网址),我想知道是否有更有效的方法从同一站点调用多个网址(已经更改了ServicePointManager.DefaultConnectionLimit)。

另外,是在每次调用时重用相同的HttpClient还是创建新的更好,下面仅使用一个而不是多个。

using (var client = new HttpClient { Timeout = new TimeSpan(0, 5, 0) })
{
    var tasks = urls.Select(async url =>
    {
        await client.GetStringAsync(url).ContinueWith(response =>
        {
           var resultHtml = response.Result;
           //process the html

        });
    }).ToList();

    Task.WaitAll(tasks.ToArray());
}
Run Code Online (Sandbox Code Playgroud)

正如@cory建议的,
这里是使用的修改后的代码TPL,但是我必须设置MaxDegreeOfParallelism = 100以达到与基于任务的速度大致相同的速度,下面的代码可以改进吗?

var downloader = new ActionBlock<string>(async url =>
{
    var client = new WebClient();
    var resultHtml = await client.DownloadStringTaskAsync(new Uri(url));


}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 }); …
Run Code Online (Sandbox Code Playgroud)

c# multithreading task-parallel-library tpl-dataflow

5
推荐指数
1
解决办法
1391
查看次数

TPL 数据流的 AsyncLocal 值不正确

考虑这个例子:

class Program

{
    private static readonly ITargetBlock<string> Mesh = CreateMesh();
    private static readonly AsyncLocal<string> AsyncLocalContext
        = new AsyncLocal<string>();

    static async Task Main(string[] args)
    {
        var tasks = Enumerable.Range(1, 4)
            .Select(ProcessMessage);
        await Task.WhenAll(tasks);

        Mesh.Complete();
        await Mesh.Completion;

        Console.WriteLine();
        Console.WriteLine("Done");
    }

    private static async Task ProcessMessage(int number)
    {
        var param = number.ToString();
        using (SetScopedAsyncLocal(param))
        {
            Console.WriteLine($"Before send {param}");
            await Mesh.SendAsync(param);
            Console.WriteLine($"After send {param}");
        }
    }

    private static IDisposable SetScopedAsyncLocal(string value)
    {
        AsyncLocalContext.Value = value;

        return new Disposer(() => AsyncLocalContext.Value = null);
    } …
Run Code Online (Sandbox Code Playgroud)

.net c# tpl-dataflow .net-core

5
推荐指数
1
解决办法
445
查看次数

取消TPL数据流块的正确方法

我正在使用TPL块来执行可能被用户取消的操作:我提出了两个选项,首先我取消整个块但不取消块内的操作,如下所示:

_downloadCts = new CancellationTokenSource();

var processBlockV1 = new TransformBlock<int, List<int>>(construct =>
{
    List<int> properties = GetPropertiesMethod(construct );
    var entities = properties
        .AsParallel()
        .Select(DoSometheningWithData)
        .ToList();
    return entities;
}, new ExecutionDataflowBlockOptions() { CancellationToken = _downloadCts.Token });
Run Code Online (Sandbox Code Playgroud)

第二个我取消内部操作,但不是块本身:

var processBlockV2 = new TransformBlock<int, List<int>>(construct =>
{
    List<int> properties = GetPropertiesMethod(construct);
    var entities = properties
        .AsParallel().WithCancellation(_downloadCts.Token)
        .Select(DoSometheningWithData)
        .ToList();
    return entities;
});
Run Code Online (Sandbox Code Playgroud)

据我了解,第一个选项将取消整个块,从而关闭整个管道。我的问题是它是否也会取消内部操作并处理所有资源(如果有)(打开 StreamReaders 等),或者最好选择第二个选项,然后我自己可以确保所有内容都被取消和清理,然后我可以使用一些方法(铁路编程)漂浮OperationCanceledException在管道上并在我想要的地方处理它?

c# task-parallel-library cancellationtokensource tpl-dataflow

5
推荐指数
0
解决办法
156
查看次数

TPL数据流与异步操作一起使用

我正在通过移植一些旧的套接字代码以使用TPL数据流和新的异步功能来尝试TPL数据流。尽管API感觉很坚如磐石,但我的代码仍然最终变得混乱。我想知道我是否在这里错过了什么。

我的要求如下:一个套接字类公开:Open,Close,Send和Receive方法。全部都返回一个Task,因此是异步的。打开和关闭是原子的。尽管发送和接收一次只能处理1条命令,但它们可以彼此相邻工作。

从逻辑上讲,这使我进入下一个内部控制代码:

// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;

// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()  { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()    { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new …
Run Code Online (Sandbox Code Playgroud)

.net c# dataflow task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
2190
查看次数

TPL DataFlow,优先级链接块?

使用TPL.DataFlow块,是否可以将两个或多个源链接到单个ITargetBlock(例如ActionBlock)并确定源的优先级?

例如

BufferBlock<string> b1 = new ...
BufferBlock<string> b2 = new ...
ActionBlock<string> a = new ...

//somehow force messages in b1 to be processed before any message of b2, always
b1.LinkTo (a);
b2.LinkTo (a);
Run Code Online (Sandbox Code Playgroud)

只要b1中有消息,我希望这些消息被输入"a",一旦b1为空,b2消息就被推送到"a"

想法?

c# task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
1290
查看次数

在.NET中并行抓取

我工作的公司运行着几百个非常动态的网站.它决定建立一个搜索引擎,我的任务是编写刮刀.有些网站在旧硬件上运行,无法承受太多惩罚,而其他网站则可以处理大量的并发用户.

我需要能够说对站点A使用5个并行请求,对站点B使用2个,对站点C使用1个并行请求.

我知道我可以使用线程,互斥体,信号量等来实现这一目标,但它会非常复杂.是否有任何更高级别的框架,如TPL,await/async,TPL Dataflow足够强大,能够以更简单的方式完成此应用程序?

.net c# task-parallel-library async-await tpl-dataflow

4
推荐指数
1
解决办法
578
查看次数

TPL数据流块,它将消息的前向延迟到下一个块

我需要一个Dataflow块,它根据消息中的时间戳(LogEntry)将消息的转发延迟到下一个块.

这就是我提出的,但感觉不对.有任何改进建议吗?

  private IPropagatorBlock<LogEntry, LogEntry> DelayedForwardBlock()
    {
        var buffer = new ConcurrentQueue<LogEntry>();

        var source = new BufferBlock<LogEntry>();

        var target = new ActionBlock<LogEntry>(item =>
        {
            buffer.Enqueue(item);
        });


        Task.Run(() =>
            {
                LogEntry entry;
                while (true)
                {
                    entry = null;
                    if (buffer.TryPeek(out entry))
                    {
                        if (entry.UtcTimestamp < (DateTime.UtcNow - TimeSpan.FromMinutes(5)))
                        {
                            buffer.TryDequeue(out entry);
                            source.Post(entry);
                        }
                    }
                }
            });


        target.Completion.ContinueWith(delegate
        {
            LogEntry entry;
            while (buffer.TryDequeue(out entry))
            {
                source.Post(entry);
            }

            source.Complete();
        });

        return DataflowBlock.Encapsulate(target, source);
    }
Run Code Online (Sandbox Code Playgroud)

.net c# task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
733
查看次数

使用TaskCompletionSource与BufferBlock <T>包装事件

Lucian在这里讨论了一种模式(技巧3:在任务返回API中包装事件并等待它们).

我试图在一个经常调用的方法上实现它,看起来像下面的设计代码:

public Task BlackBoxAsync() 
{ 
    var tcs = new TaskCompletionSource<Object>();  // new'ed up every call
    ThreadPool.QueueUserWorkItem(_ => 
    { 
        try 
        { 
            DoSomethingStuff(); 
            tcs.SetResult(null); 
        } 
        catch(Exception exc) { tcs.SetException(exc); } 
    }); 
    return tcs.Task; 
}
Run Code Online (Sandbox Code Playgroud)

我很担心性能,TaskCompletionSource每次通话都是新的(让我们说我每100毫秒调用一次这个方法).

我当时正在考虑使用BufferBlock<T>,认为每次通话都不会重新开始.所以它看起来像:

private readonly BufferBlock<object> signalDone; // dummy class-level variable, new'ed up once in CTOR

public Task BlackBoxAsync() 
{ 

    ThreadPool.QueueUserWorkItem(_ => 
    { 
        try 
        { 
            DoSomethingStuff(); 
            signalDone.Post(null);
        } 
        catch(Exception exc) {  } 
    }); 
    return signalDone.ReceiveAsync(); 
}
Run Code Online (Sandbox Code Playgroud)

调用对象会将其称为:

for (var i=0; i<10000; i++) …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library async-await tpl-dataflow taskcompletionsource

4
推荐指数
1
解决办法
1056
查看次数

为什么块按此顺序运行?

这是一个简短的代码示例,可以快速向您介绍我的问题:

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var secondBlock = new TransformBlock<int,string>(async x =>
            {
                if (x == 12)
                {
                    await Task.Delay(5000);
                    return $"{DateTime.Now}: Message is {x} (This is delayed message!) ";
                }

                return $"{DateTime.Now}: Message is {x}";
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var thirdBlock = new ActionBlock<string>(s => …
Run Code Online (Sandbox Code Playgroud)

.net c# dataflow task-parallel-library tpl-dataflow

4
推荐指数
1
解决办法
782
查看次数