Aug*_*aza 3 elixir phoenix-framework
我想创建一个类似于此的Elixir代码:
def infinite_loop(created_workers \\ []) do
case next_from_queue do
{:ok, queue_msg} ->
new_worker = Task.async(fn -> crawling(queue_msg) end)
infinite_loop([new_worker | created_workers])
{:error, :empty} ->
created_workers.map(&Task.await/1)
end
end
Run Code Online (Sandbox Code Playgroud)
假如说:
crawling功能将创建另一个3Taskcrawling 工人都可以跑3秒钟queue可能有上百万的消息我怎么知道Elixir上并行处理的限制是什么?如何管理它不破坏?
我建议使用Task.async_stream这个.Task.async_stream允许您并行处理流,同时限制并行运行的任务数.虽然Erlang 20中进程数的默认限制为262144,但如果您正在抓取某个站点,则可能需要更低的限制.
您可以使用以下函数从函数创建流,以继续使用Stream.iterate以下命令返回新项:
stream =
Stream.iterate(next_from_queue(), fn _ -> next_from_queue() end)
|> Stream.take_while(fn {:ok, _} -> true; {:error, :empty} -> false end)
Run Code Online (Sandbox Code Playgroud)
由于你想停下来{:error, :empty},我们Stream.take_while用来停止流.
然后Task.async_stream像这样使用:
stream
|> Task.async_stream(fn {:ok, queue_msg} ->
crawling(queue_msg)
end, max_concurrency: 16)
Run Code Online (Sandbox Code Playgroud)
这将并行运行最多16个任务的流.最终结果将是所有返回值的列表crawling(queue_msg).
| 归档时间: |
|
| 查看次数: |
925 次 |
| 最近记录: |