Task.async_stream超时行为

Tyl*_*ler 5 elixir

Task.async_stream选项中,:timeout描述了参数:

允许每个任务执行的最长时间(以毫秒为单位).默认为5000

在我的测试中,我做了以下事情:

iex(8)> Task.async_stream([10, 4, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
[ok: 10, ok: 4, ok: 5]

iex(10)> Task.async_stream([10], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list      
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
    (elixir) lib/enum.ex:1776: Enum.reverse/2
    (elixir) lib/enum.ex:2528: Enum.to_list/1
Run Code Online (Sandbox Code Playgroud)

为什么第一个例子没有超时(但需要大约10秒才能执行),而第二个例子表现出预期的超时行为?

Sas*_*olf 6

的实现Task.async_stream从1.4.5更改为1.5.1。

让我们看看会发生什么。

长生不老药 1.4.5

在此版本中,超时是s的一部分receiveafter

receive do
  {{^monitor_ref, position}, value} ->
    # ...

  {:down, {^monitor_ref, position}, reason} ->
    # ...

  {:DOWN, ^monitor_ref, _, ^monitor_pid, reason} ->
    # ...
after
  timeout ->
    # ...
end
Run Code Online (Sandbox Code Playgroud)

receive块的作用是等待监控进程发送的衍生任务的更新消息。为了简单起见,我截断了代码。

这在应用场景中意味着什么?仅当在几毫秒的时间内没有收到来自生成任务的消息时Task.async_stream才会超时。timeout

例子

让我们尝试使用您的示例[10, 3, 4]

iex> Task.async_stream([10, 3, 4], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
    (elixir) lib/enum.ex:1776: Enum.reverse/2
    (elixir) lib/enum.ex:2528: Enum.to_list/1
Run Code Online (Sandbox Code Playgroud)

正如我们所看到的,这会导致超时,正如预期的那样。

现在,如果我们尝试使用[10, 5],这会起作用吗?

iex> Task.async_stream([10, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
    (elixir) lib/enum.ex:1776: Enum.reverse/2
    (elixir) lib/enum.ex:2528: Enum.to_list/1
Run Code Online (Sandbox Code Playgroud)

看来初始任务花费的时间太长,超时时间为 5 秒。但只要我们添加一个中间步骤,它就会起作用。怎么样1

iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
[ok: 10, ok: 5, ok: 1]
Run Code Online (Sandbox Code Playgroud)

长生不老药 1.5.1

在 Elixir 1.5.1 中,超时逻辑的工作方式有所不同。它用于Process.send_after向监控进程发送每个生成的任务的超时消息。

# Schedule a timeout message to ourselves, unless the timeout was set to :infinity
timer_ref = case timeout do
  :infinity -> nil
  timeout -> Process.send_after(self(), {:timeout, {monitor_ref, ref}}, timeout)
end
Run Code Online (Sandbox Code Playgroud)

然后,该消息在生成任务并发送:timeout消息的同一接收中进行处理。

完整功能的链接。

例子

一旦单个进程花费的时间超过指定的超时时间,整个流就会崩溃,正如它应该的那样。

iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
    ** (EXIT) time out
    (elixir) lib/task/supervised.ex:237: Task.Supervised.stream_reduce/7
    (elixir) lib/enum.ex:1847: Enum.reverse/1
    (elixir) lib/enum.ex:2596: Enum.to_list/1
Run Code Online (Sandbox Code Playgroud)

长话短说

Elixir 1.4.5 在收到衍生进程的结果后重新跟踪超时。Elixir 1.5.1 为每个生成的进程单独跟踪它。