Elixir:如何作为客户端使用服务器发送的事件流?

Top*_*unt 2 elixir server-sent-events

我的 Phoenix 应用程序需要与第 3 方 API 集成,该 API 通过Server-Sent Events以流格式发送数据。我是第一次了解SSE;SSE 流看起来很像 Websocket,只不过它们是单向的并且更简单。

我看到很多关于如何从 Elixir 应用程序作为服务器发送SSE数据的建议(例如 1、2、3),但没有看到关于 Elixir 应用程序如何作为客户端使用SSE 数据流的建议。我看到的文章似乎假设客户端是浏览器中的 JS,但我知道 Ruby SSE 客户端存在(1 , 2),所以我认为没有强有力的技术原因说明 Elixir 应用程序无法充当上交所客户端。

在 Elixir 应用程序中使用 SSE 数据的最简单方法是什么?我将不胜感激任何指示和资源。

Top*_*unt 6

我发现了几个 很有前途的SSE 客户端库,但由于奇怪的连接错误而关闭了。然后经过一番挖掘,我发现HTTPoison(大概还有 HTTPotion)可以完全独立地连接到服务器发送的事件流,只是没有在任何地方记录。

这是一个简单的iex示例(使用 Elixir v1.8 和 HTTPoison v1.5):

> url = "https://some-domain.com/some-server-sent-event-stream/"
> HTTPoison.get!(url, [], [recv_timeout: :infinity, stream_to: self()])
  => %HTTPoison.AsyncResponse{id: #Reference<0.2736682462.4075814917.25838>}
> Process.sleep(10_000) # wait for a few events to come in
> flush()
  => %HTTPoison.AsyncChunk{
        chunk: "event:poke\ndata:{\"kittens\":3}\n\n",
        id: #Reference<0.2736682462.4075814917.25838>
      }
      %HTTPoison.AsyncChunk{
        chunk: "event:poke\ndata:{\"kittens\":3}\n\n",
        id: #Reference<0.2736682462.4075814917.25838>
      }
      %HTTPoison.AsyncChunk{
        chunk: "event:poke\ndata:{\"kittens\":4}\n\n",
        id: #Reference<0.2736682462.4075814917.25838>
      }
      :ok
Run Code Online (Sandbox Code Playgroud)

但在现实世界中,您可能希望使用 GenServer 来处理每条消息、处理断开连接等,这本质上就是上面链接的两个库的用途。一个简单的客户端可能如下所示:

# Usage:
# > SseClient.start("https://some-domain.com/some-server-sent-event-stream/")
#
defmodule SseClient do
  use GenServer

  def start(url) do
    GenServer.start_link(__MODULE__, url: url)
  end

  def init([url: url]) do
    IO.puts "Connecting to stream..."
    HTTPoison.get!(url, [], [recv_timeout: :infinity, stream_to: self()])
    {:ok, nil}
  end

  def handle_info(%HTTPoison.AsyncChunk{chunk: chunk}, _state) do
    # My use case assumes that each message contains two rows (event: and data:)
    case Regex.run(~r/^event:(\w+)\ndata:({.+})\n\n$/, chunk) do
      [_, event, data] ->
        _json = Jason.decode!(data)
        case event do
          "poke" -> IO.puts "Poke received: #{data}"
          "data" -> IO.puts "Data received: #{data}"
        end
      nil ->
        raise "Don't know how to parse received chunk: \"#{chunk}\""
    end

    {:noreply, nil}
  end

  # In addition to message chunks, we also may receive status changes etc.
  def handle_info(%HTTPoison.AsyncStatus{} = status, _state) do
    IO.puts "Connection status: #{inspect status}"
    {:noreply, nil}
  end

  def handle_info(%HTTPoison.AsyncHeaders{} = headers, _state) do
    IO.puts "Connection headers: #{inspect headers}"
    {:noreply, nil}
  end
end
Run Code Online (Sandbox Code Playgroud)