我有一个带有ActionBlock的类Receiver:
public class Receiver<T> : IReceiver<T>
{
private ActionBlock<T> _receiver;
public Task<bool> Send(T item)
{
if(_receiver!=null)
return _receiver.SendAsync(item);
//Do some other stuff her
}
public void Register (Func<T, Task> receiver)
{
_receiver = new ActionBlock<T> (receiver);
}
//...
}
Run Code Online (Sandbox Code Playgroud)
ActionBlock的Register-Action是一个带有await-Statement的async-Method:
private static async Task Writer(int num)
{
Console.WriteLine("start " + num);
await Task.Delay(500);
Console.WriteLine("end " + num);
}
Run Code Online (Sandbox Code Playgroud)
现在我想做的是同步等待(如果设置了条件),直到action方法完成以获得独占行为:
var receiver = new Receiver<int>();
receiver.Register((Func<int, Task) Writer);
receiver.Send(5).Wait(); //does not wait the action-await here!
Run Code Online (Sandbox Code Playgroud)
问题是"await Task.Delay(500);" 语句被执行,"receiver.Post(5).Wait();" 不再等了.
我尝试了几种变体(TaskCompletionSource,ContinueWith,...),但它不起作用.
有谁知道如何解决这个问题?
c# asynchronous task-parallel-library async-await tpl-dataflow
我有一些不断流动的某些项目,我需要并行处理,所以我正在使用TPL Dataflow.问题在于,共享相同密钥(类似于字典)的项目应按FIFO顺序处理,而不是彼此平行(它们可以与具有不同值的其他项目并行).
正在完成的工作是CPU绑定最小的异步锁,所以我的解决方案是创建一个ActionBlock<T>大小为s的数组,Environment.ProcessorCount没有并行性,并根据键的GetHashCode值发布到它们.
创建:
_actionBlocks = new ActionBlock<Item>[Environment.ProcessorCount];
for (int i = 0; i < _actionBlocks.Length; i++)
{
_actionBlocks[i] = new ActionBlock<Item>(_ => ProcessItemAsync(_));
}
Run Code Online (Sandbox Code Playgroud)
用法:
bool ProcessItem(Key key, Item item)
{
var actionBlock = _actionBlocks[(uint)key.GetHashCode() % _actionBlocks.Length];
return actionBlock.Post(item);
}
Run Code Online (Sandbox Code Playgroud)
所以,我的问题是,这是我问题的最佳解决方案吗?我是否会损害性能/可扩展性?我错过了什么吗?
我知道a BlockingCollection最适合消费者/生产者模式.然而,当我使用一个ActionBlock从TPL数据流库?
我最初的理解是针对IO操作,保持BlockingCollection同时CPU密集型操作适合于ActionBlock.但我觉得这不是整个故事...任何额外的见解?
.net task-parallel-library data-synchronization tpl-dataflow
TPL Dataflow提供了非常有用的功能:
public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
ITargetBlock<TInput> target,
ISourceBlock<TOutput> source)
Run Code Online (Sandbox Code Playgroud)
使您能够将多个块封装到单个转换块中.它返回一个
IPropagatorBlock<TInput, TOutput>
Run Code Online (Sandbox Code Playgroud)
它代表管道的起始和结束块.
但是,如果我的管道中的最后一个块是ActionBlock,我不能使用它,因为ActionBlock不是SourceBlock,并且函数的返回类型将是ITargetBlock,而不是IPropagatorBlock.
基本上,我正在寻找的是这样的功能:
public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
ITargetBlock<TStart> startBlock,
ActionBlock<TEnd> endBlock)
Run Code Online (Sandbox Code Playgroud)
这是一个明智的事情,或者我错过了一些简单的东西?我不太清楚如何来写呢-尤其是类设置完成.我需要创建自己的自定义块类型吗?
编辑:
好的,所以看过@Panagiotis Kanavos的回复,做了一些修补,我想出了这个.这基于EncapsulatingPropagator类,它是现有DataflowBlock.Encapsulate方法使用的类:
internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
private readonly ITargetBlock<TStart> startBlock;
private readonly ActionBlock<TEnd> endBlock;
public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
{
this.startBlock = startBlock;
this.endBlock = endBlock;
}
public Task Completion
{
get { return this.endBlock.Completion; }
}
public void Complete()
{
this.startBlock.Complete();
}
void IDataflowBlock.Fault(Exception …Run Code Online (Sandbox Code Playgroud) 我在我的应用程序中使用TPL Dataflow实现了生产者/消费者模式.我有大数据流网格,其中有大约40个块.网格中有两个主要功能部分:生产者部分和消费者部分.生产者应该继续为消费者提供大量工作,而消费者有时会缓慢地处理传入的工作.当消费者忙于一些指定数量的工作项时,我想暂停生产者.否则,该应用程序会占用大量内存/ CPU,并且行为不可持续.
我制作了演示应用程序以演示此问题:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false
};
var boundedOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false,
BoundedCapacity = 5
};
var bufferBlock = new BufferBlock<int>(boundedOptions);
var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
var broadcastBlock = new BroadcastBlock<int>(x => x, options);
var consumerBlock = new …Run Code Online (Sandbox Code Playgroud) I created a TPL Dataflow pipeline which consist of 3 TransformBlock's and an ActionBlock at the end.
var loadXml = new TransformBlock<Job, Job>(job => { ... }); // I/O
var validateData = new TransformBlock<Job, Job>(job => { ... }); // Parsing&Validating&Calculations
var importJob = new TransformBlock<Job, Job>(job => { ... }); // Saving to database
var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job));
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job));
var reportImport = new ActionBlock<Job>(job => CreateResponse(job));
loadXml.LinkTo(validateData, job => job.ReturnCode …Run Code Online (Sandbox Code Playgroud) 我寻找 JoinBlock 的替代方案,它可以通过 n-TransformBlocks 链接,并将所有 TransformBlock 源块的消息连接/合并在一起,以便将此类的集合传递给另一个数据流块。
JoinBlock 可以很好地完成工作,但仅限于连接 3 个源块。它还存在许多效率低下的问题(连接 2 个源块的偶数值类型(整数)非常慢)。有没有办法让任务从 TransformBlocks 返回并等到所有 TransformBlocks 都有一个完成的任务才能接受Task<item>?
任何替代想法?我可能有 1-20 个这样的转换块,在传递连接的项目集合之前,我需要将哪些项目连接在一起。每个转换块都保证为每个“转换”的输入项准确返回一个输出项。
编辑:要求澄清:
根据我之前的一个问题,我按如下方式设置了我的 JoinBlock:
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
});
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
});
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
{
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
});
//Linking …Run Code Online (Sandbox Code Playgroud) c# concurrency actor-model task-parallel-library tpl-dataflow
我以前从未使用过 TPL,所以我想知道是否可以用它来完成:我的应用程序从很多帧创建了一个 gif 图像动画文件。我从一个表示 gif 文件帧的 Bitmap 列表开始,需要对每一帧执行以下操作:
显然,这个过程可以对列表中的所有帧并行完成,但对于每个帧,步骤的顺序需要相同。之后,我需要将所有帧写入 gif 文件。因此,所有帧都需要按照它们在原始列表中的相同顺序接收。最重要的是,这个过程可以在第一帧准备好时开始,不需要等到所有帧都处理完。
所以情况就是这样。TPL Dataflow 适合这个吗?如果是的话,谁能给我一个关于如何设计 tpl 块结构以反映上述过程的正确方向的提示?与我发现的一些样本相比,这对我来说似乎相当复杂。
鉴于 TPL 数据流中的以下设置。
var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");
var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);
var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
return directory.GetDirectories();
});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);
var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
(z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
() => new TransformBlock<string, string>((s) =>
{
Trace.TraceInformation("Combining {0}", s);
return s;
})); …Run Code Online (Sandbox Code Playgroud) 有什么方法可以通过TPL节流来限制性能下降吗?
我有一个复杂的组件管道,并试图限制所需的内存需求。我从多个文件中并行读取,管道中的组件可能会从这些文件的随机部分中读取一些内容,其余组件则进行CPU绑定操作。
我使用通用测试方法将性能测试平台简化为这些测试。
private void TPLPerformaceTest(int generateNumbers, ExecutionDataflowBlockOptions transformBlockOptions)
{
var transformBlock = new TransformBlock<int, int>(i => i, transformBlockOptions);
var storedCount = 0;
var generatedCount = 0;
var store = new ActionBlock<int>(i => Interlocked.Increment(ref storedCount));
transformBlock.LinkTo(store);
transformBlock.Completion.ContinueWith(_ => store.Complete());
for (int i = 0; i < generateNumbers; i++)
{
transformBlock.SendAsync(i).Wait(); //To ensure delivery
Interlocked.Increment(ref generatedCount);
}
transformBlock.Complete();
store.Completion.Wait();
Assert.IsTrue(generatedCount == generateNumbers);
Assert.IsTrue(storedCount == generateNumbers);
}
Run Code Online (Sandbox Code Playgroud)
第一个没有节流。在我的CPU上,大约需要12秒钟才能完成,消耗约800MB的RAM,平均CPU利用率约为35%。
[Test]
public void TPLPerformaceUnlimitedTest()
{
var …Run Code Online (Sandbox Code Playgroud) c# parallel-processing performance performance-testing tpl-dataflow
tpl-dataflow ×10
c# ×9
.net ×4
asynchronous ×3
async-await ×2
dataflow ×2
actor-model ×1
concurrency ×1
performance ×1