Elixir:Genserver.call没有初始化handle_call

COS*_*STA 1 erlang elixir erlang-otp gen-server gossip

我正在实现Gossip Algorithm其中多个演员同时传播八卦的同时.当每个演员都听了八次Gossip时,系统停止.

现在,我有一个场景,我在发送八卦之前检查收件人演员的收听计数.如果监听计数已经是10,则不会将八卦发送给收件人.我这样做是使用同步调用来获取监听计数.

def get_message(server, msg) do
    GenServer.call(server, {:get_message, msg})
end

def handle_call({:get_message, msg}, _from, state) do
    listen_count = hd(state) 
    {:reply, listen_count, state}
end
Run Code Online (Sandbox Code Playgroud)

该程序在启动时运行良好但经过一段时间Genserver.call停止时出现超时错误,如下所示.经过一些调试,我意识到它Genserver.call变得休眠并且无法启动相应的handle_call方法.使用同步调用时是否需要此行为?由于所有参与者都是独立的,因此不应该在Genserver.call不等待彼此响应的情况下独立运行方法.

02:28:05.634 [error] GenServer #PID<0.81.0> terminating
    ** (stop) exited in: GenServer.call(#PID<0.79.0>, {:get_message, []}, 5000)
    ** (EXIT) time out
    (elixir) lib/gen_server.ex:774: GenServer.call/3
Run Code Online (Sandbox Code Playgroud)

编辑:以下代码可以在iex shell中运行时重现错误.

defmodule RumourActor do
use GenServer

def start_link(opts) do
    {:ok, pid} = GenServer.start_link(__MODULE__,opts)
    {pid}
end

def set_message(server, msg, recipient) do      
    GenServer.cast(server, {:set_message, msg, server, recipient})
end

def get_message(server, msg) do
    GenServer.call(server, :get_message)
end

def init(opts) do
    state=opts
    {:ok,state}
end

def handle_cast({:set_message, msg, server, recipient},state) do
  :timer.sleep(5000)
  c = RumourActor.get_message(recipient, [])
  IO.inspect c
  {:noreply,state}
end

def handle_call(:get_message, _from, state) do
    count = tl(state)
    {:reply, count, state}
end
end
Run Code Online (Sandbox Code Playgroud)

打开iex shell并加载模块上方.使用以下方法启动两个

a = RumourActor.start_link(["", 3])
b = RumourActor.start_link(["", 5])
Run Code Online (Sandbox Code Playgroud)

通过调用Dogbert在评论中提到的死锁条件产生错误.运行以下没有太大的时间差异.

cb = RumourActor.set_message(elem(a,0), [], elem(b,0))
ca = RumourActor.set_message(elem(b,0), [], elem(a,0))
Run Code Online (Sandbox Code Playgroud)

等待5秒钟.将出现错误.

zxq*_*xq9 6

八卦协议是一种处理异步,未知,未配置(随机)网络的方法,这些网络可能遭受间歇性中断和分区,并且不存在领导或默认结构.(请注意,这种情况在现实世界中有点不寻常,并且带外控制总是以某种方式强加给系统.)

考虑到这一点,让我们将其改为异步系统(使用cast),以便我们遵循聊天八卦风格传播的精神.

我们需要消息的摘要,这些消息计算接收到给定消息的次数,已接收的消息摘要以及已经超过幻数的消息(因此如果延迟,我们不会重新发送一个消息),以及我们系统中注册的进程列表,以便我们知道我们正在向谁广播:

(以下示例是在Erlang中,因为自从我停止使用它以来,我只是跳过Elixir语法...)

-module(rumor).

-record(s,
        {peers  = []         :: [pid()],
         digest = #{}        :: #{message_id(), non_neg_integer()},
         dead   = sets:new() :: sets:set(message_id())}).

-type message_id() :: zuuid:uuid().
Run Code Online (Sandbox Code Playgroud)

在这里我使用的是UUID,但它可能是任何东西.Erlang参考对于测试用例来说没问题,但是由于八卦在Erlang集群中没有用,并且引用在原始系统之外是不安全的,我只是跳到假设这是针对网络系统的.

我们需要一个接口函数,它允许我们告诉进程将新消息注入系统.我们还需要一个接口函数,它可以在两个进程已经在系统中之间发送消息.然后我们需要一个内部函数,向所有已知(订阅)的对等体广播消息.啊,这意味着我们需要一个问候界面,以便对等进程可以相互通知他们的存在.

我们还想要一种方法让一个过程告诉自己随着时间的推移继续广播.设置重传间隔多长时间实际上并不是一个简单的决定 - 它与网络拓扑,延迟,可变性等有关(你实际上可能偶尔会ping对等体并根据延迟开发一些启发式,丢弃同行似乎没有反应,等等 - 但我们不会在这里陷入疯狂的境地).在这里,我只是将它设置为1秒,因为这是人类观察系统的易于理解的间隔.

请注意,以下所有内容都是异步

接口...

insert(Pid, Message) ->
    gen_server:cast(Pid, {insert, Message}).

relay(Pid, ID, Message) ->
    gen_server:cast(Pid, {relay, ID, Message}).

greet(Pid) ->
    gen_server:cast(Pid, {greet, self()}).

make_introduction(Pid, PeerPid) ->
    gen_server:cast(Pid, {make_introduction, PeerPid}).
Run Code Online (Sandbox Code Playgroud)

最后一个函数将成为我们作为系统测试人员的方式,使其中一个进程调用greet/1某个目标Pid,以便他们开始构建对等网络.在现实世界中,通常会有一些不同的东西.

在我们的gen_server回调中,我们将获得:

handle_cast({insert, Message}, State) ->
    NewState = do_insert(Message, State);
    {noreply, NewState};
handle_cast({relay, ID, Message}, State) ->
    NewState = do_relay(ID, Message, State),
    {noreply, NewState};
handle_cast({greet, Peer}, State) ->
    NewState = do_greet(Peer, State),
    {noreply, NewState};
handle_cast({make_introduction, Peer}, State) ->
    NewState = do_make_introduction(Peer, State),
    {noreply, NewState}.
Run Code Online (Sandbox Code Playgroud)

非常简单的东西.

上面我提到我们需要一种方法让这件事告诉自己在延迟后重新发送.要做到这一点,我们将在延迟使用后向"redo_relay"发送一条裸信息,erlang:send_after/3因此我们需要一个handle_info/2来处理它:

handle_info({redo_relay, ID, Message}, State) ->
    NewState = do_relay(ID, Message, State),
    {noreply, NewState}.
Run Code Online (Sandbox Code Playgroud)

消息位的实现是有趣的部分,但这些都不是非常棘手.请原谅do_relay/3下面 - 它可能更简洁,但我在浏览器中写下这个,所以......

do_insert(Message, State = #s{peers = Peers, digest = Digest}) ->
    MessageID = zuuid:v1(),
    NewDigest = maps:put(MessageID, 1, Digest),
    ok = broadcast(Message, Peers),
    ok = schedule_resend(MessageID, Message),
    State#s{digest = NewDigest}.

do_relay(ID,
         Message,
         State = #s{peers = Peers, digest = Digest, dead = Dead}) ->
    case maps:find(ID, Digest) of
        {ok, Count} when Count >= 10 ->
            NewDigest = maps:remove(ID, Digest),
            NewDead = sets:add_element(ID, Dead),
            ok = broadcast(Message, Peers),
            State#s{digest = NewDigest, dead = NewDead};
        {ok, Count} ->
            NewDigest = maps:put(ID, Count + 1),
            ok = broadcast(ID, Message, Peers),
            ok = schedule_resend(ID, Message),
            State#s{digest = NewDigest};
        error ->
            case set:is_element(ID, Dead) of
                true ->
                    State;
                false ->
                    NewDigest = maps:put(ID, 1),
                    ok = broadcast(Message, Peers),
                    ok = schedule_resend(ID, Message),
                    State#s{digest = NewDigest}
            end
    end.

broadcast(ID, Message, Peers) ->
    Forward = fun(P) -> relay(P, ID, Message),
    lists:foreach(Forward, Peers).

schedule_resend(ID, Message) ->
    _ = erlang:send_after(1000, self(), {redo_relay, ID, Message}),
    ok.
Run Code Online (Sandbox Code Playgroud)

现在我们需要社交位......

do_greet(Peer, State = #s{peers = Peers}) ->
    case lists:member(Peer, Peers) of
        false -> State#s{peers = [Peer | Peers]};
        true  -> State
    end.

do_make_introduction(Peer, State = #s{peers = Peers}) ->
    ok = greet(Peer),
    do_greet(Peer, State).
Run Code Online (Sandbox Code Playgroud)

那么那里所有可怕的非常规类型的东西做了什么呢?

它避免了任何僵局的可能性.在同行系统中死锁是如此,致命的原因是,无论何时你有两个相同的进程(或演员,或其他)同步进行通信,你已经创建了一个潜在死锁的教科书案例.

任何时候A都有一个同步消息朝向B并且B同时发出同步消息,A同时你现在遇到了死锁.没有办法创建相同的进程,可以同步调用彼此而不会产生潜在的死锁.在大规模并发系统中,任何可能发生的事情几乎肯定会最终发生,所以你迟早会遇到这种情况.

Gossip的目的是异步的原因:它是一种草率,不可靠,低效的方式来处理草率,不可靠,低效的网络拓扑.尝试拨打电话,而不是强制转换,不仅违背了八卦式的消息中继的目的,这推动你成为不可能的僵局领土事发地改变从非同步协议,以同步的性质.

  • 作为一个设计指南你应该:**总是尝试做一切异步开始,然后只有当你已经证明你真的需要时才转移到同步通信.**大约1%的时间你需要一个电话但是由于某种原因无法管理它,并且会结束标记的呼叫队列,但这实际上非常罕见. (2认同)