Nik*_*a B 5 .net c# tpl-dataflow
所以,我试图围绕微软的Dataflow库.我构建了一个非常简单的管道,只包含两个块:
var start = new TransformBlock<Foo, Bar>();
var end = new ActionBlock<Bar>();
start.LinkTo(end);
Run Code Online (Sandbox Code Playgroud)
现在我可以Foo通过调用异步处理实例:
start.SendAsync(new Foo());
Run Code Online (Sandbox Code Playgroud)
我不明白的是如何在需要时同步进行处理.我认为等待就SendAsync足够了:
start.SendAsync(new Foo()).Wait();
Run Code Online (Sandbox Code Playgroud)
但显然,只要项目被管道中的第一个处理器接受,它就会返回,而不是当项目被完全处理时.那么有没有办法等到last(end)块处理给定项目?除了WaitHandle通过整个管道.
简而言之,数据流不支持开箱即用。本质上,您需要做的是标记数据,以便在处理完成后可以检索它。我已经编写了一种方法来做到这一点,让消费者在管道处理它时await。Job管道设计的唯一让步是每个块都占用一个KeyValuePair<Guid, T>. 这是基本内容JobManager和我写的有关它的帖子。请注意,帖子中的代码有点过时,需要一些更新,但它应该可以帮助您找到正确的方向。
namespace ConcurrentFlows.DataflowJobs {
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
/// <summary>
/// A generic interface defining that:
/// for a specified input type => an awaitable result is produced.
/// </summary>
/// <typeparam name="TInput">The type of data to process.</typeparam>
/// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
public interface IJobManager<TInput, TOutput> {
Task<TOutput> SubmitRequest(TInput data);
}
/// <summary>
/// A TPL-Dataflow based job manager.
/// </summary>
/// <typeparam name="TInput">The type of data to process.</typeparam>
/// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
public class DataflowJobManager<TInput, TOutput> : IJobManager<TInput, TOutput> {
/// <summary>
/// It is anticipated that jobHandler is an injected
/// singleton instance of a Dataflow based 'calculator', though this implementation
/// does not depend on it being a singleton.
/// </summary>
/// <param name="jobHandler">A singleton Dataflow block through which all jobs are processed.</param>
public DataflowJobManager(IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> jobHandler) {
if (jobHandler == null) { throw new ArgumentException("Argument cannot be null.", "jobHandler"); }
this.JobHandler = JobHandler;
if (!alreadyLinked) {
JobHandler.LinkTo(ResultHandler, new DataflowLinkOptions() { PropagateCompletion = true });
alreadyLinked = true;
}
}
private static bool alreadyLinked = false;
/// <summary>
/// Submits the request to the JobHandler and asynchronously awaits the result.
/// </summary>
/// <param name="data">The input data to be processd.</param>
/// <returns></returns>
public async Task<TOutput> SubmitRequest(TInput data) {
var taggedData = TagInputData(data);
var job = CreateJob(taggedData);
Jobs.TryAdd(job.Key, job.Value);
await JobHandler.SendAsync(taggedData);
return await job.Value.Task;
}
private static ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>> Jobs {
get;
} = new ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>>();
private static ExecutionDataflowBlockOptions Options {
get;
} = GetResultHandlerOptions();
private static ITargetBlock<KeyValuePair<Guid, TOutput>> ResultHandler {
get;
} = CreateReplyHandler(Options);
private IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> JobHandler {
get;
}
private KeyValuePair<Guid, TInput> TagInputData(TInput data) {
var id = Guid.NewGuid();
return new KeyValuePair<Guid, TInput>(id, data);
}
private KeyValuePair<Guid, TaskCompletionSource<TOutput>> CreateJob(KeyValuePair<Guid, TInput> taggedData) {
var id = taggedData.Key;
var jobCompletionSource = new TaskCompletionSource<TOutput>();
return new KeyValuePair<Guid, TaskCompletionSource<TOutput>>(id, jobCompletionSource);
}
private static ExecutionDataflowBlockOptions GetResultHandlerOptions() {
return new ExecutionDataflowBlockOptions() {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 1000
};
}
private static ITargetBlock<KeyValuePair<Guid, TOutput>> CreateReplyHandler(ExecutionDataflowBlockOptions options) {
return new ActionBlock<KeyValuePair<Guid, TOutput>>((result) => {
RecieveOutput(result);
}, options);
}
private static void RecieveOutput(KeyValuePair<Guid, TOutput> result) {
var jobId = result.Key;
TaskCompletionSource<TOutput> jobCompletionSource;
if (!Jobs.TryRemove(jobId, out jobCompletionSource)) {
throw new InvalidOperationException($"The jobId: {jobId} was not found.");
}
var resultValue = result.Value;
jobCompletionSource.SetResult(resultValue);
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
420 次 |
| 最近记录: |