Akka.NET 高效查询参与者

Max*_*Max 1 task-parallel-library akka.net

我正在使用 Akka.NET 为生产项目创建概念证明,但我面临查询概念理解问题。

情况如下: CoordinatorActor 有一个包含数千个Hotel-Actors.

我想查询所有Hotel-Actors在特定日期有空房的酒店。

当然,我可以通过它们 foreach 并发送.Ask<>特定日期的请求。拿着所有任务的参考并做一个Task.WhenAll(requests). 但这感觉有点不自然。

或者,我可以一次向所有酒店(ActorSelection 或路由器)发送请求特定日期的广播消息,但我不知道他们何时回复了Tell消息。

有没有人有建议如何解决这个问题?

Bar*_*ski 6

是的,你的感受就在这里。使用 Ask 进行参与者之间的通信被认为是非常低效的——主要是因为每个 ask 需要分配单独的消息侦听器。

第一个好问题是:您是否需要等待他们全部回复?也许可以在响应来时发出响应。

万一,当您需要在回复之前收集所有数据时,您需要有某种方法来计算所有消息,以保证其中一些消息是否仍在等待中 - 在这种情况下使用ActorSelection是不可行的。您将需要计数器或可以与每条消息关联的标识符列表 - 虽然它们甚至可以是普通数字,但通常IActorRefs 更容易使用。

您可以在下面看到Aggregator可以为此特定任务创建的参与者的简化示例- 一旦没有更多消息要等待或发生超时,它将自动返回所有回复,它收到并停止自己。

class Aggregator<T> : ReceiveActor
{
    private IActorRef originalSender;
    private ISet<IActorRef> refs;

    public Aggregator(ISet<IActorRef> refs)
    {
        this.refs = refs;
        // this operation will finish after 30 sec of inactivity
        // (when no new message arrived)
        Context.SetReceiveTimeout(TimeSpan.FromSeconds(30));
        ReceiveAny(x =>
        {
            originalSender = Sender;
            foreach (var aref in refs) aref.Tell(x);
            Become(Aggregating);
        });
    }

    private void Aggregating()
    {
        var replies = new List<T>();
        // when timeout occurred, we reply with what we've got so far
        Receive<ReceiveTimeout>(_ => ReplyAndStop(replies));
        Receive<T>(x =>
        {
            if (refs.Remove(Sender)) replies.Add(x);
            if (refs.Count == 0) ReplyAndStop(replies);
        });
    }

    private void ReplyAndStop(List<T> replies)
    {
        originalSender.Tell(new AggregatedReply<T>(replies));
        Context.Stop(Self);
    }
}
Run Code Online (Sandbox Code Playgroud)