鉴于以下情况:
BufferBlock<int> sourceBlock = new BufferBlock<int>();
TransformBlock<int, int> targetBlock = new TransformBlock<int, int>(element =>
{
return element * 2;
});
sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });
//feed some elements into the buffer block
for(int i = 1; i <= 1000000; i++)
{
sourceBlock.SendAsync(i);
}
sourceBlock.Complete();
targetBlock.Completion.ContinueWith(_ =>
{
//notify completion of the target block
});
Run Code Online (Sandbox Code Playgroud)
在targetBlock似乎永远不会完成,我想原因是,在所有的项目TransformBlock targetBlock在输出队列中等待,因为我还没有链接的targetBlock其他任何数据流块。但是,我真正想要实现的是当(A)targetBlock通知完成和(B)输入队列为空时的通知。我不想关心项目是否仍然位于TransformBlock. 我该怎么办?是让我想查询的完成状态什么的唯一方式sourceBlock,并确保了InputCount的targetBlock是零?我不确定这是否非常稳定(sourceBlock如果中的最后一项sourceBlock已传递给 …
c# concurrency message-passing task-parallel-library tpl-dataflow
我寻找 JoinBlock 的替代方案,它可以通过 n-TransformBlocks 链接,并将所有 TransformBlock 源块的消息连接/合并在一起,以便将此类的集合传递给另一个数据流块。
JoinBlock 可以很好地完成工作,但仅限于连接 3 个源块。它还存在许多效率低下的问题(连接 2 个源块的偶数值类型(整数)非常慢)。有没有办法让任务从 TransformBlocks 返回并等到所有 TransformBlocks 都有一个完成的任务才能接受Task<item>?
任何替代想法?我可能有 1-20 个这样的转换块,在传递连接的项目集合之前,我需要将哪些项目连接在一起。每个转换块都保证为每个“转换”的输入项准确返回一个输出项。
编辑:要求澄清:
根据我之前的一个问题,我按如下方式设置了我的 JoinBlock:
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
});
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
});
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
{
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
});
//Linking …Run Code Online (Sandbox Code Playgroud) c# concurrency actor-model task-parallel-library tpl-dataflow
我以前从未使用过 TPL,所以我想知道是否可以用它来完成:我的应用程序从很多帧创建了一个 gif 图像动画文件。我从一个表示 gif 文件帧的 Bitmap 列表开始,需要对每一帧执行以下操作:
显然,这个过程可以对列表中的所有帧并行完成,但对于每个帧,步骤的顺序需要相同。之后,我需要将所有帧写入 gif 文件。因此,所有帧都需要按照它们在原始列表中的相同顺序接收。最重要的是,这个过程可以在第一帧准备好时开始,不需要等到所有帧都处理完。
所以情况就是这样。TPL Dataflow 适合这个吗?如果是的话,谁能给我一个关于如何设计 tpl 块结构以反映上述过程的正确方向的提示?与我发现的一些样本相比,这对我来说似乎相当复杂。
鉴于 TPL 数据流中的以下设置。
var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");
var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);
var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
return directory.GetDirectories();
});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);
var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
(z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
() => new TransformBlock<string, string>((s) =>
{
Trace.TraceInformation("Combining {0}", s);
return s;
})); …Run Code Online (Sandbox Code Playgroud) 自从对我的传播完成管道进行了最后一次更改后,我的一个缓冲区块从未完成。让我总结一下什么是有效的,什么不再是:
以前工作:
A.LinkTo(B, PropagateCompletion);
B.LinkTo(C, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);
D.Receive();
// everything completes
Run Code Online (Sandbox Code Playgroud)
不再工作:
A.LinkTo(B, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);
await A.Completion;
someWriteOnceBlock.Post(B.Count);
// B.Complete(); commented on purpose
B.LinkTo(C, PropagateCompletion);
D.Receive();
// Only A reaches completion
// B remains in 'waiting for activation'
// C executes but obviously never completes since B doesn't either
Run Code Online (Sandbox Code Playgroud)
如果我取消注释注释行,一切正常,但显然该行不是必需的。
不知何故,我的 BufferBlock B 永远不会完成,即使链接到它的块已完成并传播其完成,并且从它链接的块接收所有缓冲项。
我有大量任务(~1000)需要执行。我在 4 核处理器上运行,所以我想一次并行处理 4 个任务。
为了给您一个起点,这里有一些示例代码。
class Program
{
public class LongOperation
{
private static readonly Random RandomNumberGenerator = new Random(0);
const int UpdateFrequencyMilliseconds = 100;
public int CurrentProgress { get; set; }
public int TargetProcess { get; set; }
public LongOperation()
{
TargetProcess = RandomNumberGenerator.Next(
(int)TimeSpan.FromSeconds(5).TotalMilliseconds / UpdateFrequencyMilliseconds,
(int)TimeSpan.FromSeconds(10).TotalMilliseconds / UpdateFrequencyMilliseconds);
}
public async Task Execute()
{
while (!IsCompleted)
{
await Task.Delay(UpdateFrequencyMilliseconds);
CurrentProgress++;
}
}
public bool IsCompleted => CurrentProgress >= TargetProcess;
}
static void Main(string[] args)
{
Task.Factory.StartNew(async …Run Code Online (Sandbox Code Playgroud) 在我的应用程序中,我想将多个字符串与替换值字典连接起来。
在readTemplateBlock获取与FileInfos并返回其作为字符串内容喂养。
该getReplacersBlock被供给(一次)与单个代用品字典。
本joinTemplateAndReplacersBlock应参加的每个项目readTemplateBlock与一个getReplacersBlock结果。
在我当前的设置中,它要求我为我发布的每个文件再次发布相同的替换字典。
// Build
var readTemplateBlock = new TransformBlock<FileInfo, string>(file => File.ReadAllText(file.FullName));
var getReplacersBlock = new WriteOnceBlock<IDictionary<string, string>>(null);
var joinTemplateAndReplacersBlock = new JoinBlock<string, IDictionary<string, string>>();
// Assemble
var propagateComplete = new DataflowLinkOptions {PropagateCompletion = true};
readTemplateBlock.LinkTo(joinTemplateAndReplacersBlock.Target1, propagateComplete);
getReplacersBlock.LinkTo(joinTemplateAndReplacersBlock.Target2, propagateComplete);
joinTemplateAndReplacersBlock.LinkTo(replaceTemplateBlock, propagateComplete);
// Post
foreach (var template in templateFilenames)
{
getFileBlock.Post(template);
}
getFileBlock.Complete();
getReplacersBlock.Post(replacers);
getReplacersBlock.Complete();
Run Code Online (Sandbox Code Playgroud)
有没有更好的块我错过了?也许我忽略了一个配置选项?
我是 TPL 数据流的新手。
我正在尝试为相当快速的输入流构建一个节流异步更新。BufferBlock 似乎很适合这个想法,因为我可以调用 ReceiveAll() 来从缓冲区中获取所有内容,并且在某些情况下,我无法在 ReceiveAsync() 上等待以在它到达时拾取下一个元素。
但它似乎有时会挂在 ReceiveAsync() 调用上;并且失败的条件很奇怪。
请注意,我对为什么会挂起很感兴趣。我已经找到了另一种使我正在处理的应用程序工作的方法,但它可能不像我没有使用 TPL Dataflow 那样简洁或可扩展,因为我显然不了解它是如何工作的。
进一步说明这里的关键用法是我执行TryReceiveAll()然后等待 ReceiveAsync()如果失败。这是突发数据到达的常见模式,我想将数据作为批处理进行处理。这就是我不想只在ReceiveAsync()上循环的原因,因此为什么直接挂钩ActionBlock或TransformBlock是行不通的。如果我删除TryReceiveAll()我的版本似乎按预期工作;不过,正如其他评论所指出的那样,对于不同的人来说似乎是不同的,所以这可能是巧合。
这是一个失败的示例...将其放入引用和使用System.Threading.Tasks.Dataflow.dll的控制台应用程序中:
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
Run Code Online (Sandbox Code Playgroud)
失败的例子:
class Program
{
static void Main(string[] args)
{
var context = new CancellationTokenSource();
var buffer = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = context.Token });
var task = Task.Run(() =>ProcessBuffer(buffer, context.Token), context.Token);
// shove 10 things onto the buffer …Run Code Online (Sandbox Code Playgroud) 我有一个相当简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产由一个消费者消费的输出。
为此,我使用 System.Threading.Tasks.Dataflow.BufferBlock<T>
一个BufferBlock对象被创建。一个Consumer是听这个BufferBlock,并处理任何接收到的输入。
send data to the同时有两个“生产者BufferBlock”
简化:
BufferBlock<int> bufferBlock = new BufferBlock<int>();
async Task Consume()
{
while(await bufferBlock.OutputAvailable())
{
int dataToProcess = await outputAvailable.ReceiveAsync();
Process(dataToProcess);
}
}
async Task Produce1()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
async Task Produce2()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for …Run Code Online (Sandbox Code Playgroud) c# dataflow producer-consumer task-parallel-library tpl-dataflow
介绍:
我正在构建一个单节点网络爬虫来简单地验证200 OK.NET Core 控制台应用程序中的URL 。我在不同的主机上有一组 URL,我用HttpClient. 我对使用 Polly 和 TPL Dataflow 还很陌生。
要求:
MaxDegreeOfParallelism.429 TooManyRequests使用 Polly 策略优雅地处理每个主机的响应。或者,我可以使用断路器在收到一个429响应时取消对同一主机的并发请求,然后一次一个地处理该特定主机?当前实施:
我当前的实现是有效的,除了我经常看到x对同一主机的并行请求429大约在同一时间返回......然后,他们都暂停重试策略......然后,他们都猛烈抨击同一台主机再次同时经常仍然收到429s。即使我在整个队列中均匀分布同一主机的多个实例,我的 URL 集合也会因一些429最终仍开始生成s 的特定主机而超重。
收到 a 后429,我想我只想向该主机发送一个并发请求,以尊重远程主机并追求200s。
验证器方法:
public async Task<int> GetValidCount(IEnumerable<Uri> urls, CancellationToken cancellationToken)
{
var validator = new TransformBlock<Uri, bool>(
async u => (await _httpClient.GetAsync(u, HttpCompletionOption.ResponseHeadersRead, cancellationToken)).IsSuccessStatusCode, …Run Code Online (Sandbox Code Playgroud) tpl-dataflow ×10
c# ×9
.net ×4
dataflow ×3
concurrency ×2
.net-core ×1
actor-model ×1
async-await ×1
asynchronous ×1
polly ×1
web-crawler ×1