Elixir工艺限制?

Aug*_*aza 3 elixir phoenix-framework

我想创建一个类似于此的Elixir代码:

def infinite_loop(created_workers \\ []) do
  case next_from_queue do
    {:ok, queue_msg} ->
      new_worker = Task.async(fn -> crawling(queue_msg) end)
      infinite_loop([new_worker | created_workers])
    {:error, :empty} ->
      created_workers.map(&Task.await/1)
  end
end
Run Code Online (Sandbox Code Playgroud)

假如说:

  1. crawling功能将创建另一个3Task
  2. 每个crawling 工人都可以跑3秒钟
  3. queue可能有上百万的消息

我怎么知道Elixir上并行处理的限制是什么?如何管理它不破坏?

Dog*_*ert 6

我建议使用Task.async_stream这个.Task.async_stream允许您并行处理流,同时限制并行运行的任务数.虽然Erlang 20中进程数的默认限制为262144,但如果您正在抓取某个站点,则可能需要更低的限制.

您可以使用以下函数从函数创建流,以继续使用Stream.iterate以下命令返回新项:

stream =
  Stream.iterate(next_from_queue(), fn _ -> next_from_queue() end)
  |> Stream.take_while(fn {:ok, _} -> true; {:error, :empty} -> false end)
Run Code Online (Sandbox Code Playgroud)

由于你想停下来{:error, :empty},我们Stream.take_while用来停止流.

然后Task.async_stream像这样使用:

stream
|> Task.async_stream(fn {:ok, queue_msg} ->
  crawling(queue_msg)
end, max_concurrency: 16)
Run Code Online (Sandbox Code Playgroud)

这将并行运行最多16个任务的流.最终结果将是所有返回值的列表crawling(queue_msg).