Ben*_*ter 5 .net task-parallel-library tpl-dataflow
我正在寻找一个轻量级,进程中,异步消息总线,并遇到了TPL Dataflow.
我目前的实现如下(https://gist.github.com/4416655上的完整示例).
public class Bus
{
private readonly BroadcastBlock<object> broadcast =
new BroadcastBlock<object>(message => message);
private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
= new ConcurrentDictionary<Guid, IDisposable>();
public Task SendAsync<TMessage>(TMessage message)
{
return SendAsync<TMessage>(message, CancellationToken.None);
}
public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
return broadcast.SendAsync(message, cancellationToken);
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));
var subscription = broadcast.LinkTo(handler,
new DataflowLinkOptions { PropagateCompletion = true },
message => message is TMessage);
return AddSubscription(subscription);
}
public void Unsubscribe(Guid subscriptionId)
{
IDisposable subscription;
if (subscriptions.TryRemove(subscriptionId, out subscription))
{
subscription.Dispose();
}
}
private Guid AddSubscription(IDisposable subscription)
{
var subscriptionId = Guid.NewGuid();
subscriptions.TryAdd(subscriptionId, subscription);
return subscriptionId;
}
}
Run Code Online (Sandbox Code Playgroud)
关于在消息传递方案中使用TPL Dataflow,我有一些一般性问题.
BroadcastBlock<T>同时向多个处理程序发送消息的推荐来源?这是我根据这篇文章得出的结论.BroadcastBlock<T>为所有消息类型使用单个实例.在处理大量邮件时,这会导致问题吗?我应该为每种消息类型创建一个单独的实例吗BroadcastBlock<T>始终存储最后发送的项目.这意味着任何新的订阅(链接)都将自动传递此消息.是有可能改变这种行为(新订阅者只能接受新的消息).在我的测试应用程序中,我在第一个处理程序中引入了延迟:
// Subscribe to Message type
var subscription1 = bus.Subscribe<Message>(async m => {
await Task.Delay(2000);
Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content);
});
Run Code Online (Sandbox Code Playgroud)
发送消息时,我希望看到每个消息逐个输出到控制台,增量为2秒.相反,在2s之后,所有消息都立即输出.我假设这是由于底层调度程序执行的并行性,但我很好奇如何更改这些设置(设置MaxDegreeOfParallelism = 1没有区别).
最后,虽然SendAsync允许我等待发送消息,但它不允许我在完成目标(等ActionBlock<T>)时等待.我认为这是PropagateCompletion可行的,但似乎并非如此.理想情况下,我想知道消息的所有处理程序何时执行.
我没有得到预期行为的原因Task.Delay是这延迟了每个处理程序的执行,而不是所有处理程序的处理.Thread.Sleep是我需要的.
回答完问题后(见下文),我意识到使用TPL Dataflow块对您的设计进行建模可能不是一个好主意.TDF适用于基本上独立的块处理消息,没有内置的跟踪单个消息的方式.但这就是你想要的东西:处理程序按顺序处理消息,跟踪每条消息的完成情况.
因此,我认为您不应该创建整个数据流网络,而是使用单个ActionBlock作为异步消息处理器:
public class Bus
{
class Subscription
{
public Guid Id { get; private set; }
public Func<object, Task> HandlerAction { get; private set; }
public Subscription(Guid id, Func<object, Task> handlerAction)
{
Id = id;
HandlerAction = handlerAction;
}
}
private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>();
private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>();
private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor;
public Bus()
{
// subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe
var subscriptions = new List<Subscription>();
m_messageProcessor = new ActionBlock<Tuple<object, Action>>(
async tuple =>
{
var message = tuple.Item1;
var completedAction = tuple.Item2;
// could be made more efficient, probably doesn't matter
Guid idToUnsubscribe;
while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe))
{
subscriptions.RemoveAll(s => s.Id == idToUnsubscribe);
}
Subscription handlerToSubscribe;
while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe))
{
subscriptions.Add(handlerToSubscribe);
}
foreach (var subscription in subscriptions)
{
await subscription.HandlerAction(message);
}
completedAction();
});
}
public Task SendAsync<TMessage>(TMessage message)
{
var tcs = new TaskCompletionSource<bool>();
Action completedAction = () => tcs.SetResult(true);
m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction));
return tcs.Task;
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
return Subscribe<TMessage>(
message =>
{
handlerAction(message);
// we need a completed non-generic Task; this is a simple, efficient way to get it
// another option would be to use async lambda with no await,
// but that's less efficient and produces a warning
return Task.FromResult(false);
});
}
public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction)
{
Func<object, Task> actionWithCheck = async message =>
{
if (message is TMessage)
await handlerAction((TMessage)message);
};
var id = Guid.NewGuid();
m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck));
return id;
}
public void Unsubscribe(Guid subscriptionId)
{
m_idsToUnsubscribe.Enqueue(subscriptionId);
}
}
Run Code Online (Sandbox Code Playgroud)
(我决定使用队列进行订阅和取消订阅,以便在处理邮件时处理程序列表不会更改.)
是否
BroadcastBlock<T>同时向多个处理程序发送消息的推荐来源?
是的,乍一看,听起来就像BroadcastBlock<T>是你想要的.在TPL Dataflow中肯定没有任何类似的块.
在我的实现中,我正在为所有消息类型使用单个BroadcastBlock实例.在处理大量邮件时,这会导致问题吗?我应该为每种消息类型创建一个单独的实例吗
对于所有消息类型使用单个块,您可以在单个线程上执行更多工作(发送给所有处理程序).对于每种消息类型使用一个块,您可以在较少的线程上执行较少的工作(仅发送给正确的处理程序).因此,我认为假设后者会更快是合理的.
但是不要忘记应用程序的性能优化规则:首先,编写简单易读的代码.只有事实证明它实际上很慢,尝试优化它.在比较两种选择时,总是使用分析来确定哪一种实际上更快,不要只猜测哪一种应该更快.
BroadcastBlock<T>始终存储最后发送的项目.这意味着任何新的订阅(链接)都将自动传递此消息.是有可能改变这种行为(新订阅者只能接受新的消息)?
不,没有办法配置BroadcastBlock<T>这样做.如果您不需要所有功能BroadcastBlock<T>(发送到具有有限容量的块,可能是暂时已满,支持非贪婪块作为目标),您可能需要编写自定义版本BroadcastBlock<T>来执行此操作.
发送消息时,我希望看到每个消息逐个输出到控制台,增量为2秒.相反,在2s之后,所有消息都立即输出.我假设这是由于底层调度程序执行的并行性,但我很好奇如何更改这些设置(设置
MaxDegreeOfParallelism = 1没有区别).
TDF的一个要点是每个块都是独立的,因此多个块可以在多个线程上执行.如果那不是你想要的,那么ActionBlock<T>对每个处理程序使用单独的可能不是最好的解决方案.事实上,TDF可能根本不是最好的解决方案.
此外,Subscribe()接受Action<TMessage>,这意味着您的lambda将被编译为async void方法.这些应仅用于特定(且相对罕见)的情况,在这种情况下,您没有其他选择.如果你想支持async处理程序,你应该接受async Task方法,即Func<TMessage, Task>.
我没有得到预期行为的原因
Task.Delay是这延迟了每个处理程序的执行,而不是所有处理程序的处理.Thread.Sleep是我需要的.
使用Thread.Sleep()与异步的整个想法相反,如果可能的话,你不应该使用它.另外,我认为它实际上并不像你想要的那样:它为每个线程引入了一个延迟,但是TPL Dataflow将使用多个线程,所以这不会像你想象的那样.
最后,虽然
SendAsync允许我等待发送消息,但它不允许我在完成目标(等ActionBlock<T>)时等待.我认为这是PropagateCompletion可行的,但似乎并非如此.理想情况下,我想知道消息的所有处理程序何时执行.
PropagateCompletion连同Complete()和Completion为整个块的移交完成,而不是一个单一的消息的处理.其中一个原因是数据流网络更复杂,可能不清楚何时处理消息.例如,如果消息已经发送到a的所有当前目标BroadcastBlock<T>,但是也会发送到所有新添加的目标,是否应该被视为完成?
如果你想这样做,你必须以某种方式手动完成,可能使用TaskCompletionSource.