如何等待Elixir中的多个任务?

Iva*_*ang 7 erlang task elixir async-await

我想同时执行多个任务.在Javascript中,我会这样做:

async function cook_an_egg() {}

async function take_shower() {}

async function call_mum() {}

await Promise.all([cook_an_egg(), take_shower(), call_mum()])
Run Code Online (Sandbox Code Playgroud)

如何Promise.all在Elixir Task模块中实现?从文档来看,似乎只能完成await1项任务; 在每个内部定义1个函数task; 并且仅对多个项目应用相同的功能async_stream.

Ian*_*ung 21

对于 Elixir v1.11.0 及更高版本

Task.await_many旨在做到这一点。它正确处理整体超时,并且在遇到退出、超时等情况时应该做最不令人惊讶的事情。

tasks = [
  Task.async(fn -> cook_an_egg(:medium) end),
  Task.async(fn -> take_shower(10) end),
  Task.async(fn -> call_mum() end),
]

Task.await_many(tasks)
Run Code Online (Sandbox Code Playgroud)

对于旧版本

比A更防弹溶液Task.awaitTask.yield_many。不幸的是,它有点冗长,因为它让我们自己负责处理超时和死任务。如果我们想模仿async/的行为await并在出现问题时退出,它将如下所示:

tasks = [
  Task.async(fn -> cook_an_egg(:medium) end),
  Task.async(fn -> take_shower(10) end),
  Task.async(fn -> call_mum() end),
]

Task.yield_many(tasks)
|> Enum.map(fn {task, result} ->
  case result do
    nil ->
      Task.shutdown(task, :brutal_kill)
      exit(:timeout)
    {:exit, reason} ->
      exit(reason)
    {:ok, result} ->
      result
  end
end)
Run Code Online (Sandbox Code Playgroud)

为什么不使用await

Task.await在简单的情况下使用可以工作,但如果你关心超时,你可能会遇到麻烦。跨列表映射按顺序发生,这意味着每个Task.await将在给出结果之前阻塞指定的超时时间,此时我们移动到列表中的下一项并再次阻塞直到完全超时。

我们可以通过创建休眠 1-8 秒的任务列表来演示这种行为。默认超时为 5 秒,当直接使用 调用时,其中一些任务将被终止await,但是当我们在列表中枚举时,不会发生这种情况:

for ms <- [2_000, 4_000, 6_000] do
  Task.async(fn -> Process.sleep(ms); ms end)
end
|> Enum.map(&Task.await/1)

# Blocks for 6 seconds
# => [2000, 4000, 6000]

# Each `await` picks up after the previous one finishes with a fresh 5s timeout.
# Since each one blocks for 2s before finishing, no timeout is triggered
# but the total run time runs over.
 
# async(2s)--await(2s)-->(2s)
# async(4s)                  --await(2s)-->(4s)
# async(6s)                                    --await(2s)-->(6s)
Run Code Online (Sandbox Code Playgroud)

如果我们将其修改为 use Task.yield_many,我们可以获得所需的行为:

for ms <- [2_000, 4_000, 6_000] do
  Task.async(fn -> Process.sleep(ms); ms end)
end
|> Task.yield_many(5000)
|> Enum.map(fn {t, res} -> res || Task.shutdown(t, :brutal_kill) end)

# Blocks for 5 seconds
# => [{:ok, 2000}, {:ok, 4000}, nil]
Run Code Online (Sandbox Code Playgroud)


小智 13

您可以将await函数映射到任务引用列表.就像是

tasks = Enum.reduce(0..9, [], fn _, acc -> 
  [Task.async(&any_job/0) | acc]
end)

Enum.map(tasks, &Task.await/1)
Run Code Online (Sandbox Code Playgroud)

  • 有关系吗?假设第一个过程花费的时间最长 - 所以等待它,然后当第一个等待完成时,所有其他过程在完成过程后立即完成. (3认同)
  • 有趣的是,我希望 map 函数中的第一个 `Task.await` 会阻止进程调用下一个 `await`,直到它完成。“Task.await”没有在设计上阻止任何“Enum”方法,我说得对吗? (2认同)