在Task.async_stream选项中,:timeout描述了参数:
允许每个任务执行的最长时间(以毫秒为单位).默认为5000
在我的测试中,我做了以下事情:
iex(8)> Task.async_stream([10, 4, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
[ok: 10, ok: 4, ok: 5]
iex(10)> Task.async_stream([10], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
(elixir) lib/enum.ex:1776: Enum.reverse/2
(elixir) lib/enum.ex:2528: Enum.to_list/1
Run Code Online (Sandbox Code Playgroud)
为什么第一个例子没有超时(但需要大约10秒才能执行),而第二个例子表现出预期的超时行为?
的实现Task.async_stream从1.4.5更改为1.5.1。
让我们看看会发生什么。
在此版本中,超时是s块的一部分receiveafter。
receive do
{{^monitor_ref, position}, value} ->
# ...
{:down, {^monitor_ref, position}, reason} ->
# ...
{:DOWN, ^monitor_ref, _, ^monitor_pid, reason} ->
# ...
after
timeout ->
# ...
end
Run Code Online (Sandbox Code Playgroud)
该receive块的作用是等待监控进程发送的衍生任务的更新消息。为了简单起见,我截断了代码。
这在应用场景中意味着什么?仅当在几毫秒的时间内没有收到来自生成任务的消息时Task.async_stream才会超时。timeout
让我们尝试使用您的示例[10, 3, 4]:
iex> Task.async_stream([10, 3, 4], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
(elixir) lib/enum.ex:1776: Enum.reverse/2
(elixir) lib/enum.ex:2528: Enum.to_list/1
Run Code Online (Sandbox Code Playgroud)
正如我们所看到的,这会导致超时,正如预期的那样。
现在,如果我们尝试使用[10, 5],这会起作用吗?
iex> Task.async_stream([10, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
(elixir) lib/enum.ex:1776: Enum.reverse/2
(elixir) lib/enum.ex:2528: Enum.to_list/1
Run Code Online (Sandbox Code Playgroud)
看来初始任务花费的时间太长,超时时间为 5 秒。但只要我们添加一个中间步骤,它就会起作用。怎么样1?
iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
[ok: 10, ok: 5, ok: 1]
Run Code Online (Sandbox Code Playgroud)
在 Elixir 1.5.1 中,超时逻辑的工作方式有所不同。它用于Process.send_after向监控进程发送每个生成的任务的超时消息。
# Schedule a timeout message to ourselves, unless the timeout was set to :infinity
timer_ref = case timeout do
:infinity -> nil
timeout -> Process.send_after(self(), {:timeout, {monitor_ref, ref}}, timeout)
end
Run Code Online (Sandbox Code Playgroud)
然后,该消息在生成任务并发送:timeout消息的同一接收中进行处理。
一旦单个进程花费的时间超过指定的超时时间,整个流就会崩溃,正如它应该的那样。
iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:237: Task.Supervised.stream_reduce/7
(elixir) lib/enum.ex:1847: Enum.reverse/1
(elixir) lib/enum.ex:2596: Enum.to_list/1
Run Code Online (Sandbox Code Playgroud)
Elixir 1.4.5 在收到衍生进程的结果后重新跟踪超时。Elixir 1.5.1 为每个生成的进程单独跟踪它。
| 归档时间: |
|
| 查看次数: |
504 次 |
| 最近记录: |