我知道a BlockingCollection最适合消费者/生产者模式.然而,当我使用一个ActionBlock从TPL数据流库?
我最初的理解是针对IO操作,保持BlockingCollection同时CPU密集型操作适合于ActionBlock.但我觉得这不是整个故事...任何额外的见解?
.net task-parallel-library data-synchronization tpl-dataflow
我有一个数据流,我以几种不同的方式处理...所以我想发送每个消息的副本我得到多个目标,以便这些目标可以并行执行...但是,我需要设置BoundedCapacity在我的块上,因为数据的流式传输方式比我的目标可以处理的速度快,并且有大量数据.如果没有BoundedCapacity,我会很快耗尽内存.
但是问题是如果目标无法处理它,BroadcastBlock将丢弃消息(由于BoundedCapacity).
我需要的是一个不会丢弃消息的BroadcastBlock,但实质上会拒绝其他输入,直到它可以向每个目标传递消息然后准备好更多.
有这样的东西,还是有人编写了一个以这种方式运行的自定义块?
我有一个BatchBlock与BoundedCapacity在其上定义
var _batchBlock = new BatchBlock<int>(2, new GroupingDataflowBlockOptions
{BoundedCapacity = 100 });
Run Code Online (Sandbox Code Playgroud)
因此,如果队列容量达到100,则该块会推迟收到的每条消息,直到某个地点可用.在这种情况下,批处理队列被认为是贪婪还是非贪婪?
我该如何使用DataflowBlockOptions.CancellationToken?
BufferBlock如果我创建这样的实例:
var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = _cts.Token });
那么使用使用的消费者/生产者方法queue,我如何使用它的 CancellationToken 来处理取消?
例如,在生产者方法中,如何检查取消令牌 - 我没有找到任何属性来访问令牌。
编辑:生产/消费方法示例:
private static async Task Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
foreach (var value in values)
{
await queue.SendAsync(value);
}
queue.Complete();
}
private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
{
var ret = new List<int>();
while (await queue.OutputAvailableAsync())
{
ret.Add(await queue.ReceiveAsync());
}
return ret;
}
Run Code Online (Sandbox Code Playgroud)
调用它的代码:
var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = …Run Code Online (Sandbox Code Playgroud) 我正在构建一个Dataflows管道,它的工作就是处理大文件.每个文件都经过解析,分析和渲染; 但是每个文件可能在管道中采用不同的路径,具体取决于它是什么类型的文件.
此管道的用户界面包含要处理的文件列表,以及进度条和每个文件旁边的"取消"按钮(当然,还有一个用于向队列添加新文件的按钮).当用户单击特定文件旁边的"取消"按钮时,我想从管道中删除该文件.
我必须遗漏一些东西,因为我无法弄清楚如何做到这一点.我知道我可以取消整个块,但我不想这样做,我只想取消管道中的单个项目.那么,我错过了什么?
TPL Dataflow提供了非常有用的功能:
public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
ITargetBlock<TInput> target,
ISourceBlock<TOutput> source)
Run Code Online (Sandbox Code Playgroud)
使您能够将多个块封装到单个转换块中.它返回一个
IPropagatorBlock<TInput, TOutput>
Run Code Online (Sandbox Code Playgroud)
它代表管道的起始和结束块.
但是,如果我的管道中的最后一个块是ActionBlock,我不能使用它,因为ActionBlock不是SourceBlock,并且函数的返回类型将是ITargetBlock,而不是IPropagatorBlock.
基本上,我正在寻找的是这样的功能:
public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
ITargetBlock<TStart> startBlock,
ActionBlock<TEnd> endBlock)
Run Code Online (Sandbox Code Playgroud)
这是一个明智的事情,或者我错过了一些简单的东西?我不太清楚如何来写呢-尤其是类设置完成.我需要创建自己的自定义块类型吗?
编辑:
好的,所以看过@Panagiotis Kanavos的回复,做了一些修补,我想出了这个.这基于EncapsulatingPropagator类,它是现有DataflowBlock.Encapsulate方法使用的类:
internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
private readonly ITargetBlock<TStart> startBlock;
private readonly ActionBlock<TEnd> endBlock;
public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
{
this.startBlock = startBlock;
this.endBlock = endBlock;
}
public Task Completion
{
get { return this.endBlock.Completion; }
}
public void Complete()
{
this.startBlock.Complete();
}
void IDataflowBlock.Fault(Exception …Run Code Online (Sandbox Code Playgroud) 我试图使用文件IO操作实现读写操作,并将这些操作封装在一起TransformBlock,以使这些操作线程安全,而不是使用锁定机制.
但问题是,当我尝试并行写入5个文件时,有一个异常的内存,并且在使用此实现时它阻止了UI线程.该实现在Windows Phone项目中完成.请说明这个实现有什么问题.
文件IO操作
public static readonly IsolatedStorageFile _isolatedStore = IsolatedStorageFile.GetUserStoreForApplication();
public static readonly FileIO _file = new FileIO();
public static readonly ConcurrentExclusiveSchedulerPair taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
public static readonly ExecutionDataflowBlockOptions exclusiveExecutionDataFlow
= new ExecutionDataflowBlockOptions
{
TaskScheduler = taskSchedulerPair.ExclusiveScheduler,
BoundedCapacity = 1
};
public static readonly ExecutionDataflowBlockOptions concurrentExecutionDataFlow
= new ExecutionDataflowBlockOptions
{
TaskScheduler = taskSchedulerPair.ConcurrentScheduler,
BoundedCapacity = 1
};
public static async Task<T> LoadAsync<T>(string fileName)
{
T result = default(T);
var transBlock = new TransformBlock<string, T>
(async fName …Run Code Online (Sandbox Code Playgroud) 我的情况是我有一个来自外部源的BufferBlock<Stream>接收Stream,比方说文件系统或一些FTP服务器.这些文件Stream将传递到另一个块并进行处理.
唯一的问题是其中一些文件是压缩的,我想Block在中间添加一个可以在必要时解压缩文件,并Stream为每个条目创建多个输出.
但是我不想使用TransformBlockMany,因为这意味着我必须完全接收ZIP Stream并立即创建输出Stream数组.
我希望这Block能够接收ZIP Stream,开始解压缩,并Push在条目就绪时进入下一个流,因此Process Block可以在第一个文件解压缩后立即开始处理,而不是等到所有内容都解压缩.
我该怎么做呢?
我正在使用 TPL 数据流构建应用程序。其实我有以下问题。我有一个 transformblock var tfb1 = new TranformBlock<InMsg, IReadOnlyCollection<OutMsg>>。所以tfb1接收消息并创建一个输出消息列表。此输出消息列表应链接到路由器数据块,该数据块OutMsg作为输入接收(而不是IReadOnlyCollection<OutMsg>)。
我怎样才能展平IReadOnlyCollection以便包含的消息可以用作输入,例如TransformBlock<OutMsg, SomeOtherType>. 可以通过LinkTo()吗?
谢谢
我创建了类似于网络爬虫的东西来创建我需要管理的 1000 多个 Web 服务的报告。因此,我创建了一个 TPL 数据流管道来管理获取和处理数据。我想象的管道看起来有点像这样(对不起我的绘画技巧:D):

我已经创建了一个实现并且一切正常,直到我开始作为一个整体开始我的管道。我将 500 个对象作为管道的输入提供给管道,并希望程序运行一段时间,但程序在移动到执行块后停止执行。在检查程序的流程后,在我看来,完成快速传播到处置块。我使用相同的管道创建了一个小示例项目,以检查它是我对输入类的实现还是管道本身。示例代码是这样的:
public class Job
{
public int Ticker { get; set; }
public Type Type { get; }
public Job(Type type)
{
Type = type;
}
public Task Prepare()
{
Console.WriteLine("Preparing");
Ticker = 0;
return Task.CompletedTask;
}
public Task Tick()
{
Console.WriteLine("Ticking");
Ticker++;
return Task.CompletedTask;
}
public bool IsCommitable()
{
Console.WriteLine("Trying to commit");
return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
}
public bool IsFinished()
{ …Run Code Online (Sandbox Code Playgroud)