Max*_*Max 1 task-parallel-library akka.net
我正在使用 Akka.NET 为生产项目创建概念证明,但我面临查询概念理解问题。
情况如下: CoordinatorActor 有一个包含数千个Hotel-Actors.
我想查询所有Hotel-Actors在特定日期有空房的酒店。
当然,我可以通过它们 foreach 并发送.Ask<>特定日期的请求。拿着所有任务的参考并做一个Task.WhenAll(requests). 但这感觉有点不自然。
或者,我可以一次向所有酒店(ActorSelection 或路由器)发送请求特定日期的广播消息,但我不知道他们何时都回复了Tell消息。
有没有人有建议如何解决这个问题?
是的,你的感受就在这里。使用 Ask 进行参与者之间的通信被认为是非常低效的——主要是因为每个 ask 需要分配单独的消息侦听器。
第一个好问题是:您是否需要等待他们全部回复?也许可以在响应来时发出响应。
万一,当您需要在回复之前收集所有数据时,您需要有某种方法来计算所有消息,以保证其中一些消息是否仍在等待中 - 在这种情况下使用ActorSelection是不可行的。您将需要计数器或可以与每条消息关联的标识符列表 - 虽然它们甚至可以是普通数字,但通常IActorRefs 更容易使用。
您可以在下面看到Aggregator可以为此特定任务创建的参与者的简化示例- 一旦没有更多消息要等待或发生超时,它将自动返回所有回复,它收到并停止自己。
class Aggregator<T> : ReceiveActor
{
private IActorRef originalSender;
private ISet<IActorRef> refs;
public Aggregator(ISet<IActorRef> refs)
{
this.refs = refs;
// this operation will finish after 30 sec of inactivity
// (when no new message arrived)
Context.SetReceiveTimeout(TimeSpan.FromSeconds(30));
ReceiveAny(x =>
{
originalSender = Sender;
foreach (var aref in refs) aref.Tell(x);
Become(Aggregating);
});
}
private void Aggregating()
{
var replies = new List<T>();
// when timeout occurred, we reply with what we've got so far
Receive<ReceiveTimeout>(_ => ReplyAndStop(replies));
Receive<T>(x =>
{
if (refs.Remove(Sender)) replies.Add(x);
if (refs.Count == 0) ReplyAndStop(replies);
});
}
private void ReplyAndStop(List<T> replies)
{
originalSender.Tell(new AggregatedReply<T>(replies));
Context.Stop(Self);
}
}
Run Code Online (Sandbox Code Playgroud)