tko*_*wal 13 parallel-processing elixir
我想在一个大清单上做一个平行地图.代码看起来有点像这样:
big_list
|> Stream.map(&Task.async(Module, :do_something, [&1]))
|> Stream.map(&Task.await(&1))
|> Enum.filter filter_fun
Run Code Online (Sandbox Code Playgroud)
但是我正在检查Stream实现,据我所知,它Stream.map结合了函数并将组合函数应用于流中的元素,这意味着序列是这样的:
在这种情况下,它不会并行执行.我是对的还是我错过了什么?
如果我是对的,那么这段代码呢?
Stream.map Task.async ...
|> Enum.map Task.await ...
Run Code Online (Sandbox Code Playgroud)
这是否会并行运行?
Paw*_*rok 13
第二个也没有做你想要的.您可以使用以下代码清楚地看到它:
defmodule Test do
def test do
[1,2,3]
|> Stream.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))
end
def job(number) do
:timer.sleep 1000
IO.inspect(number)
end
end
Test.test
Run Code Online (Sandbox Code Playgroud)
你会看到一个数字,然后是1秒等待,另一个数字,依此类推.这里的关键是你想尽快创建任务,所以你根本不应该使用懒惰Stream.map.而是Enum.map在那一点上使用渴望:
|> Enum.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))
Run Code Online (Sandbox Code Playgroud)
另一方面,你可以Stream.map在等待时使用,只要你稍后做一些急切的操作,就像你的filter.通过这种方式,等待将穿插您可能对结果进行的任何处理.
Elixir 1.4提供了新的Task.async_stream/5函数,该函数将返回在枚举中的每个项目上并发运行给定函数的流.
还可以使用:max_concurrency和:timeoutoptions参数指定最大工作数和超时.
请注意,您不必等待此任务,因为该函数返回一个流,因此您可以使用Enum.to_list/1或使用Stream.run/1.
这将使您的示例并发运行:
big_list
|> Task.async_stream(Module, :do_something, [])
|> Enum.filter(filter_fun)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4948 次 |
| 最近记录: |