GenServer上的Elixir非阻塞线程?

dek*_*eko 1 multithreading nonblocking elixir gen-server

我正在努力完成一项简单的任务,但我遇到了很大的困难.

请假设我有一个GenServer,其中一个回调如下:

  @impl true
  def handle_call(:state, _, state) do
    # Something that would require 10 seconds
    newState = do_job()
    {:reply, newState, newState}
  end
Run Code Online (Sandbox Code Playgroud)

如果我是对的,GenServer.call(:server, :state)从客户端调用会阻塞服务器10秒钟,然后新状态将返回给客户端.

好.我希望服务器在不被阻止的情况下处理此任务.我使用任务尝试过,但Task.await/2Task.yield/2阻塞服务器.

我希望服务器不要阻塞,并在那10秒后,在客户端终端上接收结果.这怎么可能?

7st*_*tud 8

如果我是对的,从客户端调用GenServer.call(:server,:state)会阻塞服务器10秒钟,然后新状态将返回给客户端.

是.Elixir做你告诉它做的事,并在这一行:

newState = do_job()
Run Code Online (Sandbox Code Playgroud)

你告诉elixir为do_job()变量赋予返回值newState.elixir执行该赋值的唯一方法是获取go_job().... 的返回值,这将花费10秒.

我希望服务器不要阻塞,并在那10秒后,在客户端终端上接收结果.

一种方法是GenServer向spawn()新进程执行10秒函数并将客户端的pid传递给新进程.当新进程从10秒函数获得返回值时,新进程可以send()使用客户端pid向客户端发送消息.

这意味着客户端需要调用handle_call()而不是handle_cast()因为服务器的实现handle_cast()没有from包含客户端pid 的参数变量.另一方面,handle_call() 确实from参数变量接收客户端pid ,因此服务器可以将客户端pid传递给生成的进程.注意spawn()立即返回,这意味着handle_call()可以立即返回类似的回复:working_on_it.

下一个问题是:客户端将如何知道GenServer生成的新进程何时完成执行10秒函数?客户端无法知道服务器上的某些无关进程何时完成执行,因此客户端需要等待接收,直到消息从生成的进程到达.并且,如果客户端正在检查其邮箱中的邮件,那么知道发件人是谁是有帮助的,这意味着handle_call()还应该将生成的进程的pid返回给客户端.客户端的另一个选择是在执行其他工作的比赛之间经常轮询其邮箱.为此,客户端可以在after子句中定义一个短暂超时的接收,然后调用一个函数after clause来执行一些客户端工作,然后对包含接收的函数进行递归调用,以便该函数再次检查邮箱.

那怎么样Task?根据任务文档:

如果您使用的是异步任务,则必须等待回复...

那么,如果你必须等待,那么异步任务有什么用呢?答案:如果进程至少有两个需要执行的长时间运行的函数,那么进程可以用来Task.async()同时运行所有函数,而不是执行一个函数并等到它完成,然后执行另一个函数并等到它完成,然后执行另一个,等等.

但是,Task还定义了一个start()函数:

开始(mod,fun,args)

开始一项任务.

这仅在任务用于副作用时使用(即对返回的结果不感兴趣),并且不应将其链接到当前进程.

这听起来像Task.start()完成了我在第一种方法中描述的内容.您需要定义fun以便它将运行10秒函数,然后在10秒函数执行完毕后将消息发送回客户端(= 副作用).

下面是一个生成长时间运行函数的GenServer的简单示例,它允许服务器在执行长时间运行的函数时保持对其他客户端请求的响应:

a.exs:

defmodule Gen1.Server do
  use GenServer

  @impl true
  def init(init_state) do
    {:ok, init_state}
  end

  def long_func({pid, _ref}) do
    Process.sleep 10_000
    result = :dog
    send(pid, {self(), result})
  end

  @impl true
  def handle_call(:go_long, from, state) do
    long_pid = spawn(Gen1.Server, :long_func, [from])
    {:reply, long_pid, state}
  end
  def handle_call(:other, _from, state) do
    {:reply, :other_stuff, state}
  end

end
Run Code Online (Sandbox Code Playgroud)

iex会话将是客户:

~/elixir_programs$ iex a.exs
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]
Interactive Elixir (1.6.6) - press Ctrl+C to exit (type h() ENTER for help)

iex(1)> {:ok, server_pid} = GenServer.start_link(Gen1.Server, [])
{:ok, #PID<0.93.0>}

iex(2)> long_pid = GenServer.call(server_pid, :go_long, 15_000)
#PID<0.100.0>

iex(3)> GenServer.call(server_pid, :other)                       
:other_stuff

iex(4)> receive do                                             
...(4)> {^long_pid, reply} -> reply                            
...(4)> end                                                    
:dog

iex(7)> 
Run Code Online (Sandbox Code Playgroud)

变量就像long_pid匹配任何东西.要long_pid仅匹配其当前值,请指定^long_pid(^称为引脚运算符).

GenServer还允许您阻止客户端的调用,handle_call()同时允许服务器继续执行.如果客户端无法继续直到从服务器获取所需数据,但您希望服务器保持对其他客户端的响应,那么这很有用.这是一个例子:

defmodule Gen1.Server do
  use GenServer

  @impl true
  def init(init_state) do
    {:ok, init_state}
  end

  @impl true
  def handle_call(:go_long, from, state) do
    spawn(Gen1.Server, :long_func, [from])
    {:noreply, state}  #The server doesn't send anything to the client, 
                       #so the client's call of handle_call() blocks until 
                       #somebody calls GenServer.reply().
  end

  def long_func(from) do
    Process.sleep 10_000
    result = :dog
    GenServer.reply(from, result) 
  end

end
Run Code Online (Sandbox Code Playgroud)

在iex中:

iex(1)> {:ok, server_pid} = GenServer.start_link(Gen1.Server, [])
{:ok, #PID<0.93.0>}

iex(2)> result = GenServer.call(server_pid, :go_long, 15_000)
...hangs for 10 seconds...   
:dog

iex(3)> 
Run Code Online (Sandbox Code Playgroud)