我寻找 JoinBlock 的替代方案,它可以通过 n-TransformBlocks 链接,并将所有 TransformBlock 源块的消息连接/合并在一起,以便将此类的集合传递给另一个数据流块。
JoinBlock 可以很好地完成工作,但仅限于连接 3 个源块。它还存在许多效率低下的问题(连接 2 个源块的偶数值类型(整数)非常慢)。有没有办法让任务从 TransformBlocks 返回并等到所有 TransformBlocks 都有一个完成的任务才能接受Task<item>?
任何替代想法?我可能有 1-20 个这样的转换块,在传递连接的项目集合之前,我需要将哪些项目连接在一起。每个转换块都保证为每个“转换”的输入项准确返回一个输出项。
编辑:要求澄清:
根据我之前的一个问题,我按如下方式设置了我的 JoinBlock:
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
});
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
});
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
{
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
});
//Linking …Run Code Online (Sandbox Code Playgroud) c# concurrency actor-model task-parallel-library tpl-dataflow
我以前从未使用过 TPL,所以我想知道是否可以用它来完成:我的应用程序从很多帧创建了一个 gif 图像动画文件。我从一个表示 gif 文件帧的 Bitmap 列表开始,需要对每一帧执行以下操作:
显然,这个过程可以对列表中的所有帧并行完成,但对于每个帧,步骤的顺序需要相同。之后,我需要将所有帧写入 gif 文件。因此,所有帧都需要按照它们在原始列表中的相同顺序接收。最重要的是,这个过程可以在第一帧准备好时开始,不需要等到所有帧都处理完。
所以情况就是这样。TPL Dataflow 适合这个吗?如果是的话,谁能给我一个关于如何设计 tpl 块结构以反映上述过程的正确方向的提示?与我发现的一些样本相比,这对我来说似乎相当复杂。
我目前正在尝试使用TransformBlocks来使我的代码运行得更快.相反,我发现我基本上没有实现并行化:

正如您所看到的,存在相当多的死空间,很少有I/O或其他问题阻止事物并行运行(注意:所有绿色块都是主线程).
调用代码的基本结构如下:
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 };
var download = new TransformBlock<string, Tuple<string, string>>(s => sendAndReciveRequest(s), options);
var process = new TransformBlock<Tuple<string, string, TransformBlock<string, Tuple<string, string>>>, List<string>>(s => Helpers.ParseKBDL(s), options);
var toObjects = new TransformBlock<List<string>, List<Food>>(list => toFood(list), options);
for (char char1 = 'a'; char1 < 'z' + 1; char1++)
download.Post(char1.ToString());
while ((download.InputCount != 0 || download.OutputCount != 0 || process.InputCount != 0) || (Form1.downloadCount != Form1.processCount))
{
if (download.OutputCount == 0 && download.InputCount …Run Code Online (Sandbox Code Playgroud) concurrency performance task-parallel-library c#-5.0 tpl-dataflow
我已经编写了一个复制该问题的示例测试。这不是我的实际代码,我尝试编写一个小代码。如果将边界容量增加到迭代次数,从而有效地使其没有边界,则不会出现死锁,如果将最大并行度设置为较小的数字(例如 1),则不会出现死锁。
再说一次,我知道下面的代码不是很好,但我实际发现的代码要大得多并且难以理解。基本上有一个与远程资源的连接的阻塞对象池,并且流中的几个块使用了该连接。
关于如何解决这个问题有什么想法吗?乍一看,这似乎是数据流的问题。当我中断查看线程时,我看到许多线程在 Add 上被阻塞,0 个线程在 take 上被阻塞。addBlocks 出站队列中有几个项目尚未传播到 takeblock,因此它被卡住或死锁。
var blockingCollection = new BlockingCollection<int>(10000);
var takeBlock = new ActionBlock<int>((i) =>
{
int j = blockingCollection.Take();
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 20,
SingleProducerConstrained = true
});
var addBlock = new TransformBlock<int, int>((i) =>
{
blockingCollection.Add(i);
return i;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 20
});
addBlock.LinkTo(takeBlock, new DataflowLinkOptions()
{
PropagateCompletion = true
});
for (int i = 0; i < 100000; i++)
{
addBlock.Post(i);
}
addBlock.Complete();
await addBlock.Completion; …Run Code Online (Sandbox Code Playgroud) c# multithreading task-parallel-library blockingcollection tpl-dataflow
鉴于 TPL 数据流中的以下设置。
var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");
var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);
var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
return directory.GetDirectories();
});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);
var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
(z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
() => new TransformBlock<string, string>((s) =>
{
Trace.TraceInformation("Combining {0}", s);
return s;
})); …Run Code Online (Sandbox Code Playgroud) 我有(我的网址列表大约有 1000 个网址),我想知道是否有更有效的方法从同一站点调用多个网址(已经更改了ServicePointManager.DefaultConnectionLimit)。
另外,是在每次调用时重用相同的HttpClient还是创建新的更好,下面仅使用一个而不是多个。
using (var client = new HttpClient { Timeout = new TimeSpan(0, 5, 0) })
{
var tasks = urls.Select(async url =>
{
await client.GetStringAsync(url).ContinueWith(response =>
{
var resultHtml = response.Result;
//process the html
});
}).ToList();
Task.WaitAll(tasks.ToArray());
}
Run Code Online (Sandbox Code Playgroud)
正如@cory建议的,
这里是使用的修改后的代码TPL,但是我必须设置MaxDegreeOfParallelism = 100以达到与基于任务的速度大致相同的速度,下面的代码可以改进吗?
var downloader = new ActionBlock<string>(async url =>
{
var client = new WebClient();
var resultHtml = await client.DownloadStringTaskAsync(new Uri(url));
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 }); …Run Code Online (Sandbox Code Playgroud) 我是 TPL 数据流的新手。
我正在尝试为相当快速的输入流构建一个节流异步更新。BufferBlock 似乎很适合这个想法,因为我可以调用 ReceiveAll() 来从缓冲区中获取所有内容,并且在某些情况下,我无法在 ReceiveAsync() 上等待以在它到达时拾取下一个元素。
但它似乎有时会挂在 ReceiveAsync() 调用上;并且失败的条件很奇怪。
请注意,我对为什么会挂起很感兴趣。我已经找到了另一种使我正在处理的应用程序工作的方法,但它可能不像我没有使用 TPL Dataflow 那样简洁或可扩展,因为我显然不了解它是如何工作的。
进一步说明这里的关键用法是我执行TryReceiveAll()然后等待 ReceiveAsync()如果失败。这是突发数据到达的常见模式,我想将数据作为批处理进行处理。这就是我不想只在ReceiveAsync()上循环的原因,因此为什么直接挂钩ActionBlock或TransformBlock是行不通的。如果我删除TryReceiveAll()我的版本似乎按预期工作;不过,正如其他评论所指出的那样,对于不同的人来说似乎是不同的,所以这可能是巧合。
这是一个失败的示例...将其放入引用和使用System.Threading.Tasks.Dataflow.dll的控制台应用程序中:
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
Run Code Online (Sandbox Code Playgroud)
失败的例子:
class Program
{
static void Main(string[] args)
{
var context = new CancellationTokenSource();
var buffer = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = context.Token });
var task = Task.Run(() =>ProcessBuffer(buffer, context.Token), context.Token);
// shove 10 things onto the buffer …Run Code Online (Sandbox Code Playgroud) 介绍:
我正在构建一个单节点网络爬虫来简单地验证200 OK.NET Core 控制台应用程序中的URL 。我在不同的主机上有一组 URL,我用HttpClient. 我对使用 Polly 和 TPL Dataflow 还很陌生。
要求:
MaxDegreeOfParallelism.429 TooManyRequests使用 Polly 策略优雅地处理每个主机的响应。或者,我可以使用断路器在收到一个429响应时取消对同一主机的并发请求,然后一次一个地处理该特定主机?当前实施:
我当前的实现是有效的,除了我经常看到x对同一主机的并行请求429大约在同一时间返回......然后,他们都暂停重试策略......然后,他们都猛烈抨击同一台主机再次同时经常仍然收到429s。即使我在整个队列中均匀分布同一主机的多个实例,我的 URL 集合也会因一些429最终仍开始生成s 的特定主机而超重。
收到 a 后429,我想我只想向该主机发送一个并发请求,以尊重远程主机并追求200s。
验证器方法:
public async Task<int> GetValidCount(IEnumerable<Uri> urls, CancellationToken cancellationToken)
{
var validator = new TransformBlock<Uri, bool>(
async u => (await _httpClient.GetAsync(u, HttpCompletionOption.ResponseHeadersRead, cancellationToken)).IsSuccessStatusCode, …Run Code Online (Sandbox Code Playgroud) 考虑这个例子:
class Program
{
private static readonly ITargetBlock<string> Mesh = CreateMesh();
private static readonly AsyncLocal<string> AsyncLocalContext
= new AsyncLocal<string>();
static async Task Main(string[] args)
{
var tasks = Enumerable.Range(1, 4)
.Select(ProcessMessage);
await Task.WhenAll(tasks);
Mesh.Complete();
await Mesh.Completion;
Console.WriteLine();
Console.WriteLine("Done");
}
private static async Task ProcessMessage(int number)
{
var param = number.ToString();
using (SetScopedAsyncLocal(param))
{
Console.WriteLine($"Before send {param}");
await Mesh.SendAsync(param);
Console.WriteLine($"After send {param}");
}
}
private static IDisposable SetScopedAsyncLocal(string value)
{
AsyncLocalContext.Value = value;
return new Disposer(() => AsyncLocalContext.Value = null);
} …Run Code Online (Sandbox Code Playgroud) 我正在使用TPL块来执行可能被用户取消的操作:我提出了两个选项,首先我取消整个块但不取消块内的操作,如下所示:
_downloadCts = new CancellationTokenSource();
var processBlockV1 = new TransformBlock<int, List<int>>(construct =>
{
List<int> properties = GetPropertiesMethod(construct );
var entities = properties
.AsParallel()
.Select(DoSometheningWithData)
.ToList();
return entities;
}, new ExecutionDataflowBlockOptions() { CancellationToken = _downloadCts.Token });
Run Code Online (Sandbox Code Playgroud)
第二个我取消内部操作,但不是块本身:
var processBlockV2 = new TransformBlock<int, List<int>>(construct =>
{
List<int> properties = GetPropertiesMethod(construct);
var entities = properties
.AsParallel().WithCancellation(_downloadCts.Token)
.Select(DoSometheningWithData)
.ToList();
return entities;
});
Run Code Online (Sandbox Code Playgroud)
据我了解,第一个选项将取消整个块,从而关闭整个管道。我的问题是它是否也会取消内部操作并处理所有资源(如果有)(打开 StreamReaders 等),或者最好选择第二个选项,然后我自己可以确保所有内容都被取消和清理,然后我可以使用一些方法(铁路编程)漂浮OperationCanceledException在管道上并在我想要的地方处理它?
c# task-parallel-library cancellationtokensource tpl-dataflow
tpl-dataflow ×10
c# ×9
.net ×2
.net-core ×2
concurrency ×2
actor-model ×1
asynchronous ×1
c#-5.0 ×1
dataflow ×1
performance ×1
polly ×1
web-crawler ×1