预告:伙计们,这个问题不是关于如何实施重试政策.这是关于正确完成TPL数据流块.
这个问题主要是我之前在ITargetBlock中重试策略的问题的延续.这个问题的答案是@ svick使用TransformBlock(源)和TransformManyBlock(目标)的智能解决方案.剩下的唯一问题是以正确的方式完成此块:等待所有重试首先完成,然后完成目标块.这是我最终得到的结果(它只是一个片段,不要过多关注非线程安全retries集):
var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
async message =>
{
try
{
var result = new[] { await transform(message.Data) };
retries.Remove(message);
return result;
}
catch (Exception ex)
{
message.Exceptions.Add(ex);
if (message.RetriesRemaining == 0)
{
if (failureHandler != null)
failureHandler(message.Exceptions);
retries.Remove(message);
}
else
{
retries.Add(message);
message.RetriesRemaining--;
Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(message));
}
return null;
}
}, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async …Run Code Online (Sandbox Code Playgroud) 我想运行一堆异步任务,并限制在任何给定时间可以完成的任务数量.
假设您有1000个网址,并且您只希望一次打开50个请求; 但只要一个请求完成,您就会打开与列表中下一个URL的连接.这样,一次只打开50个连接,直到URL列表用完为止.
如果可能的话,我也想利用给定数量的线程.
我提出了一种扩展方法,ThrottleTasksAsync可以实现我想要的功能.那里有更简单的解决方案吗?我认为这是一种常见的情况.
用法:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
Run Code Online (Sandbox Code Playgroud)
这是代码:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() => …Run Code Online (Sandbox Code Playgroud) 我对通过Post()或SendAsync()发送项目之间的区别感到困惑.我的理解是,在所有情况下,一旦项目到达数据块的输入缓冲区,控制权将返回到调用上下文,对吗?那为什么我需要SendAsync?如果我的假设不正确,那么我想,相反,如果使用数据块的整个想法是建立并发和异步环境,为什么有人会使用Post().
我当然理解技术上的差异,Post()返回一个bool,而SendAsync返回一个等待bool的任务.但是它有什么影响呢?何时返回bool(我理解是否确认该项是否放在数据块的队列中)会被延迟?我理解async/await并发框架的一般概念,但在这里并没有多大意义,因为除了bool之外,对传入项所做的任何操作的结果都不会返回给调用者,而是放在一个"out-queue"并转发到链接数据块或丢弃.
发送项目时两种方法之间是否存在性能差异?
c# concurrency message-passing task-parallel-library tpl-dataflow
我正在尝试使用TPL Dataflow来创建管道.到目前为止一切正常,我的管道定义如下(虽然我的问题只是广播员,submissionSucceeded,submissionFailed):
// Define tasks
var productListingBatchBuffer = new BufferBlock<PostSubmissionState>();
var splitFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SplitFile(s));
var saveFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SaveFile(s));
var postSubmission = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => PostSubmission(s));
var broadcaster = new BroadcastBlock<PostSubmissionState>(state => state);
var submissionSucceeded = new ActionBlock<PostSubmissionState>(s => SubmissionSucceeded(s));
var submissionFailed = new ActionBlock<PostSubmissionState>(s => SubmissionFailed(s));
// Link em up
productListingBatchBuffer.LinkTo(splitFile, new DataflowLinkOptions() { PropagateCompletion = true });
splitFile.LinkTo(saveFile, new DataflowLinkOptions() { PropagateCompletion = true });
saveFile.LinkTo(postSubmission, new DataflowLinkOptions() { PropagateCompletion …Run Code Online (Sandbox Code Playgroud) 两个完成转换后,如何重新编写代码完成的代码?我认为完成意味着它被标记为完成并且"出队列"是空的?
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 input count: " + transformBlock1.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processorBlock = new ActionBlock<string>(i =>
{
Console.WriteLine(i);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { …Run Code Online (Sandbox Code Playgroud) 我正在寻找一个TPL数据流块解决方案,它可以容纳多个项目,可以链接到多个目标块,但是它能够将项目转发到仅通过过滤器/谓词的特定目标块.在任何时候都不应该将项目同时传递到多个目标块,始终只能传递给与过滤器匹配的项目,或者可以丢弃该项目.我不喜欢BroadCastBlock,因为如果我理解正确,它不能保证传送(或者是吗?)并且过滤是在目标块侧完成的,这意味着BroadCastBlock基本上将每个项目的副本发送到所有linkedTo目标块.如果我理解正确,它也不会在任何时候持有多个项目.我不想使用Post/Async但维护LinkTo链.
有没有办法绕过完整的自定义数据流块?或者我误解了BroadCastBlock的工作原理?不幸的是,实际上没有太多文档可以详细介绍并涵盖用例.任何想法都受到高度赞赏.
var buffer = new BufferBlock<object>();
var producer = Task.Run(async () =>
{
while (true)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
buffer.Post(null);
Console.WriteLine("Post " + buffer.Count);
}
});
var consumer = Task.Run(async () =>
{
while (await buffer.OutputAvailableAsync())
{
IList<object> items;
buffer.TryReceiveAll(out items);
Console.WriteLine("TryReceiveAll " + buffer.Count);
}
});
await Task.WhenAll(consumer, producer);
Run Code Online (Sandbox Code Playgroud)
生产者应该每隔100毫秒将项目发布到缓冲区,并且消费者应该清除缓冲区中的所有项目并异步地等待更多项目显示.
实际发生的是制作人清除所有项目一次,然后再也不会超越OutputAvailableAsync.如果我切换消费者逐个删除项目,它将作为例外工作:
while (await buffer.OutputAvailableAsync())
{
object item;
while (buffer.TryReceive(out item)) ;
}
Run Code Online (Sandbox Code Playgroud)
我误会了什么吗?如果没有,问题是什么?
有2种不同的官方TPL Dataflow nuget包.我很困惑,选择我应该使用哪一个.
据我所知,System.Threading.Tasks.Dataflow版本比其他版本稍微新一点,似乎System.Threading.Tasks.Dataflow是针对最新版本的.net.
谁能解释这些之间的差异?
我使用TPL Dataflow API编写了一个小管道,它从多个线程接收数据并对它们执行处理.
当我为每个块配置它使用MaxDegreeOfParallelism = Environment.ProcessorCount(8在我的情况下)时,我注意到它填充多个线程中的缓冲区并且处理第二个块不会开始直到所有线程都收到+ - 1700个元素.你可以在这里看到这个.
当我设置MaxDegreeOfParallelism = 1然后我注意到在一个线程上接收所有元素并且在接收到+40个元素之后处理发送已经开始.数据在这里.
当我设置MaxDegreeOfParallelism = 1并在发送每个输入之前引入1000ms的延迟时,我注意到元素在收到后立即发送,并且每个接收的元素都放在一个单独的线程上.数据在这里.
到目前为止的设置.我的问题如下:
当我比较设置1和2时,我注意到与并行相比,在串行完成时处理元素的启动速度要快得多(即使考虑到并行具有8倍的线程数).是什么导致这种差异?
由于这将在ASP.NET环境中运行,因此我不想生成不必要的线程,因为它们都来自单个线程池.如设置3所示,即使只有少量数据,它仍会在多个线程上传播.这也是令人惊讶的,因为从设置1我会假设数据在线程上顺序传播(注意前50个元素都是如何进入线程16).我可以确保它只按需创建新线程吗?
还有另一个概念称为BufferBlock<T>.如果TransformBlock<T>已经排队输入,那么在我的管道(ReceiveElement)中交换第一步的实际区别是BufferBlock什么?
class Program
{
static void Main(string[] args)
{
var dataflowProcessor = new DataflowProcessor<string>();
var amountOfTasks = 5;
var tasks = new Task[amountOfTasks];
for (var i = 0; i < amountOfTasks; i++)
{
tasks[i] = SpawnThread(dataflowProcessor, $"Task {i …Run Code Online (Sandbox Code Playgroud) 我有一块C#5.0代码,可以生成大量的网络和磁盘I/O. 我需要并行运行此代码的多个副本.以下哪种技术可能会给我带来最佳性能:
等待的异步方法
直接使用TPL中的Task
TPL数据流nuget
反应性扩展
我不太擅长这种平行的东西,但如果使用较低的杠杆,比如说Thread,可以给我更好的性能,我也会考虑.
conceptual task-parallel-library system.reactive async-await tpl-dataflow
tpl-dataflow ×10
c# ×9
async-await ×4
concurrency ×2
.net ×1
actor ×1
asynchronous ×1
conceptual ×1
exception ×1
semaphore ×1
throttling ×1