如何设置Phoenix PubSub订阅者回调

Mat*_*don 0 elixir publish-subscribe redis phoenix-framework

我有一个相当简单的要求,围绕凤凰城内置的2个服务(现在):

ServiceA负责注册用户.注册用户时,ServiceA广播包含有关新创建用户的信息的消息.现在使用Controller操作中的以下代码完成此操作:

ServiceA.Endpoint.broadcast("activity:all", "new:user", %{email: "test@test.com"})

ServiceB 负责监听所有这些活动广播并与他们合作(基本上建立一个活动源).

我遇到了一个绊脚石,因为我可以看到ServiceA向Redis播放消息(使用Phoenix.PubSub.Redis),但不完全了解如何让订阅者ServiceB处理它...

下面这段代码是据我已经成功地获得,这确实的东西时,广播消息,然后抛出一个例外.

部分订户模块

defmodule ServiceB.UserSubscriber do

  def start_link do
    sub = spawn_link &(process_feed/0)
    ServiceB.Endpoint.subscribe(:user_pubsub, "activity:all")
    {:ok, sub}
  end

  def process_feed do
    receive do
      params ->
        IO.inspect "processing goes here..."
    end
    process_feed
  end

end
Run Code Online (Sandbox Code Playgroud)

例外

[error] GenServer :user_pubsub terminating
** (FunctionClauseError) no function clause matching in Phoenix.PubSub.RedisServer.handle_info/2
Run Code Online (Sandbox Code Playgroud)

我猜我在GenServer某个地方错过了一大堆工作,但似乎无法在网上发现任何暗示在哪里.

Mat*_*don 8

问题(正如预期的那样)是我的订阅者模块没有实现为GenServer,但我试图复制相同的功能(并且非常糟糕!).按如下方式更新我的订阅者模型已经成功了:

defmodule SubscriberService.ActivitySubscriber do
  use GenServer

  def start_link(channel) do
    GenServer.start_link(__MODULE__, channel)
  end

  def init(channel) do
    pid = self
    ref = SubscriberService.Endpoint.subscribe(pid, channel)
    {:ok, {pid, channel, ref}}
  end

  def handle_info(%{event: "new:user"} = message, state) do
    IO.inspect "#######################"
    IO.inspect "New User - Received Message:"
    IO.inspect message
    IO.inspect "#######################"
    {:noreply, state}
  end

  def handle_info(message, state) do
    IO.inspect "#######################"
    IO.inspect "Catch All - Received Message:"
    IO.inspect message
    IO.inspect "#######################"
    {:noreply, state}
  end
end
Run Code Online (Sandbox Code Playgroud)

如您所见,init/1触发订阅,handle_info/2函数接收传入消息.

如果你想看看它如何在它的所有荣耀(发布者和订阅者服务)中工作,请看一下repo.