如何处理进程'消息队列溢出?

Pro*_*ist -1 elixir

我是Elixir的新手,之前我从未遇到过这个问题.我很好奇处理这类问题的最佳或可接受的方式是什么?

spawn是一个从websocket接收数据的进程,然后将send这些数据传回给父进程.父运行一个递归process()函数,该函数receive来自生成的进程.

process函数与数据匹配,并且在大多数情况下运行回调函数.由于它处理数据,因此该回调函数可能位于较重的一侧.

通过删除大量Logger.info语句,我能够发现生成的进程从websocket接收数据并将send此数据传递给父进程,父进程实际上并没有处理邮箱.

我使用该alive?/1函数确定父进程是活动的,后来我使用该函数检查了邮箱的状态Process.info/1- 它显示邮箱消息的数量正在增长(数百条消息).

我的工作解决方案是,而不是在process函数中调用回调spawn,我相信它允许父进程更快地处理其邮箱.

处理这种情况的其他更好的方法是什么?

def run(url, callback) do
    {domain, path} = parse_url(url)
    socket = Socket.Web.connect!(domain, path: path, secure: true)
    spawn_link(Project.WebsocketClient, :listen, [url, socket, self])

    process(callback)
end

def process(callback) do
    receive do
        {:ok, data} ->
            callback.(data)
            # spawn(fn -> callback.(data) end) - my fix.
        {:ping} ->
            Logger.info("Pong")
        {:error, _, url} ->
            run(url, callback)
    end
    process(callback)
end

defp recv(socket) do
    try do
        Socket.Web.recv!(socket)
    rescue
        e in RuntimeError -> {:error, e}
    end
end

def listen(url, socket, pid) do
    case recv(socket) do
        {:text, data} ->
            send(pid, {:ok, data})
        {:ping, _} ->
            Logger.info("Ping!")
            Socket.Web.send!(socket, {:pong, ""})
            send(pid, {:ping})
        {:error, e} ->
            Logger.warn("Websocket died because: #{inspect(e)}. Attempting to restart")
            send(pid, {:error, e, url})
            exit(:died)
    end
    listen(url, socket, pid)
end
Run Code Online (Sandbox Code Playgroud)

tko*_*wal 7

这是一个开放式的问题,所以我将分享我对这个问题的看法.

  1. 为什么邮箱中有邮件?

receive有选择性.如果其他人正在向进程发送消息并且它们与这些模式中的任何一个都不匹配,则消息可能会累积.记录这些消息甚至崩溃过程总是一个好主意:

receive do
  (...)
  other -> log_somewhere_or_crash(other)
end
Run Code Online (Sandbox Code Playgroud)
  1. 如果使用spawn帮助你,可能邮箱没有错误的消息混乱.

spawn这不是一个坏主意,但你无法控制创建的进程数.当您创建数百万个内存时,可能会耗尽内存.使用poolboy可能是个好主意.您可以定义所需的工人数量.

在使用池中的工作程序之前,您必须将其检出.完成工作后,您必须将其检入池中.如果所有工作人员都忙,并且您使用了阻止API,那么调用进程将等待.它会将队列从"进程进程"移动到"侦听进程".

  1. 使用GenServer.

您已GenServer使用process函数重新实现了该行为.GenServer确切地说,你想要什么,但通常更容易测试和调试.您只需指定回调.它也可以很容易地监督.

  1. 当系统超载时,没有什么能帮到你.

如果处理繁重且您的消息不断出现,它们将在某处缓冲.现在它在父进程中.如果使用spawn,它们将在调度程序队列中等待的许多进程中进行缓冲.如果减慢从WebSocket接收消息的速度,它们将存储在TCP缓冲区中,这也可能会溢出.这样,发件人将等待TCP窗口.如果它在一个单独的线程中生成消息,它们将在那里累积.

最好有一种流量控制机制,通知生产者减慢或丢弃无法处理的消息.

关于它的博客文章真的很棒.