使用LIFO逻辑运行的MailboxProcessor

NoI*_*his 5 f# asynchronous agent mailboxprocessor lifo

我正在学习F#agents(MailboxProcessor).

我正在处理一个非常传统的问题.

  • 我有一个代理(dataSource),它是流数据的来源.数据必须由一系列代理(dataProcessor)处理.我们可以将其dataProcessor视为某种跟踪设备.
  • 数据的流动速度可能快于dataProcessor可以处理其输入的速度.
  • 有一些延迟是可以的.但是,我必须确保代理人始终处于工作之上,并且不会在过时的观察中被堆积

我正在探索解决这个问题的方法.

一个想法是实现堆栈(LIFO)dataSource.dataSourcedataProcessor可用于接收和处理数据时,将发送最新的观察结果.该解决方案可能有效,但dataProcessor可能需要被阻止和重新激活,因此可能会变得复杂; 并传达其状态dataSource,导致双向沟通问题.这个问题可以归结为一个blocking queue消费者-生产者问题,但我不知道..

第二个想法是有 dataProcessor照顾的消息排序.在这个架构中,dataSource只需在dataProcessor队列中发布更新.dataProcessor将用于Scan获取队列中可用的最新数据.这可能是要走的路.但是,我不确定在当前的设计中MailboxProcessor是否可以清除消息队列,删除旧的过时消息.此外,在这里写道:

不幸的是,当前版本的F#中的TryScan功能有两种方式.首先,重点是指定超时,但实现实际上并不尊重它.具体而言,不相关的消息会重置计时器.其次,与其他扫描功能一样,在锁定下检查消息队列,该锁定防止任何其他线程在扫描期间发布,这可能是任意长的时间.因此,TryScan函数本身往往会锁定并发系统,甚至可能引入死锁,因为调用者的代码是在锁内部进行评估的(例如,当锁定下的代码阻塞时,从函数参数发送到Scan或TryScan会使代理死锁获得已经存在的锁定.

将最新观察结果反弹可能是一个问题.这篇文章的作者@Jon Harrop暗示了这一点

我设法围绕它进行构建,结果架构实际上更好.本质上,我热切地Receive使用我自己的本地队列来消息和过滤所有消息.

这个想法肯定值得探索,但在开始使用代码之前,我会欢迎一些关于如何构建解决方案的输入.

谢谢.

V.B*_*.B. 1

tl;dr我会尝试这个:从 FSharp.Actor 或 Zach Bray 的博客文章中获取 Mailbox 实现,用 ConcurrentStack 替换 ConcurrentQueue(加上一些有界容量逻辑),并使用这个更改后的代理作为调度程序将消息从 dataSource 传递到一大群数据处理器实现为普通 MBP 或 Actor。

tl;dr2如果工作人员是一种稀缺且缓慢的资源,并且我们需要在工作人员准备就绪时处理最新的消息,那么这一切都归结为具有堆栈而不是队列的代理(带有一些有界的队列)容量逻辑)加上工作人员的 BlockingQueue。调度程序将就绪的工作程序出列,然后从堆栈中弹出一条消息并将该消息发送给该工作程序。工作完成后,工作人员在准备就绪时(例如 before let! msg = inbox.Receive())将自己排队到队列中。然后,调度程序消费者线程会阻塞,直到任何工作程序准备就绪,而生产者线程则保持有界堆栈更新。(有界堆栈可以用锁内的数组+偏移量+大小来完成,下面的太复杂了)

细节

MailBoxProcessor 被设计为只有一个消费者。这甚至在MBP 的源代码中进行了注释(搜索单词“DRAGONS”:))

如果您将数据发送到 MBP,则只有一个线程可以从内部队列或堆栈中获取数据。在您的特定用例中,我将直接使用 ConcurrentStack或更好地包装到BlockingCollection中:

  • 它将允许许多并发消费者
  • 它非常快并且线程安全
  • BlockingCollection具有BoundedCapacity允许您限制集合大小的属性。它会抛出Add,但你可以抓住它或使用它TryAdd。如果 A 是主堆栈,B 是备用堆栈,则TryAdd转到 A,如果为 false,则转到 B 并使用Interlocked.ExchangeAdd交换两者,然后处理 A 中所需的消息,清除它,创建一个新的备用堆栈 - 或者如果处理则使用三个堆栈A 的长度可能比 B 再次充满的时间长;通过这种方式,您不会阻塞也不会丢失任何消息,但可以通过受控方式丢弃不需要的消息。

BlockingCollection 具有诸如 AddToAny/TakeFromAny 之类的方法,它们适用于 BlockingCollections 数组。这可能会有所帮助,例如:

  • dataSource 使用 ConcurrentStack 实现 (BCCS) 向 BlockingCollection 生成消息
  • 另一个线程使用来自 BCCS 的消息并将它们发送到处理 BCCS 的数组。你说有很多数据。您可能会牺牲一个线程来无限期地阻塞和分派您的消息
  • 每个处理代理都有自己的 BCCS 或实现为调度程序向其发布消息的代理/参与者/MBP。在您的情况下,您只需要向一个处理器代理发送一条消息,因此您可以将处理代理存储在循环缓冲区中,以便始终将消息分派给最近最少使用的处理器。

像这样的东西:

            (data stream produces 'T)
                |
            [dispatcher's BCSC]
                |
            (a dispatcher thread consumes 'T  and pushes to processors, manages capacity of BCCS and LRU queue)
                 |                               |
            [processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP]
                 |                               |
               (process)                         (process)
Run Code Online (Sandbox Code Playgroud)

您可能想阅读堆数据结构而不是 ConcurrentStack 。如果您需要通过消息的某些属性(例如时间戳)而不是通过它们到达堆栈的顺序来获取最新消息(例如,如果传输和到达顺序<>创建顺序可能存在延迟),您可以获得最新的消息使用堆消息。

如果您仍然需要代理语义/API,除了 Dave 的链接之外,您还可以阅读多个来源,并以某种方式采用多个并发消费者的实现:

  • Zach Bray 撰写的一篇关于高效 Actor 实现的有趣文章。在那里,您确实需要用一行或类似的// Might want to schedule this call on another thread.行替换(在注释下)该行,因为否则生产线程将消耗线程 - 对于单个快速生产者来说并不好。然而,对于如上所述的调度员来说,这正是所需要的。execute trueasync { execute true } |> Async.Start

  • FSharp.Actor(又名Fakka开发分支和 FSharp MPB 源代码(上面的第一个链接)对于实现细节可能非常有用。FSharp.Actors 库已经冻结了几个月,但开发分支有一些活动。

  • 在这种情况下,不应错过Google 网上论坛中有关 Fakka 的讨论

我有一个有点类似的用例,在过去的两天里,我研究了在 F# Agents/Actors 上能找到的所有内容。这个答案是我自己尝试这些想法的一种TODO,其中一半是在写它的时候诞生的。