Top*_*unt 2 elixir server-sent-events
我的 Phoenix 应用程序需要与第 3 方 API 集成,该 API 通过Server-Sent Events以流格式发送数据。我是第一次了解SSE;SSE 流看起来很像 Websocket,只不过它们是单向的并且更简单。
我看到很多关于如何从 Elixir 应用程序作为服务器发送SSE数据的建议(例如 1、2、3),但我没有看到关于 Elixir 应用程序如何作为客户端使用SSE 数据流的建议。我看到的文章似乎假设客户端是浏览器中的 JS,但我知道 Ruby SSE 客户端存在(1 , 2),所以我认为没有强有力的技术原因说明 Elixir 应用程序无法充当上交所客户端。
在 Elixir 应用程序中使用 SSE 数据的最简单方法是什么?我将不胜感激任何指示和资源。
我发现了几个 很有前途的SSE 客户端库,但由于奇怪的连接错误而关闭了。然后经过一番挖掘,我发现HTTPoison(大概还有 HTTPotion)可以完全独立地连接到服务器发送的事件流,只是没有在任何地方记录。
这是一个简单的iex示例(使用 Elixir v1.8 和 HTTPoison v1.5):
> url = "https://some-domain.com/some-server-sent-event-stream/"
> HTTPoison.get!(url, [], [recv_timeout: :infinity, stream_to: self()])
=> %HTTPoison.AsyncResponse{id: #Reference<0.2736682462.4075814917.25838>}
> Process.sleep(10_000) # wait for a few events to come in
> flush()
=> %HTTPoison.AsyncChunk{
chunk: "event:poke\ndata:{\"kittens\":3}\n\n",
id: #Reference<0.2736682462.4075814917.25838>
}
%HTTPoison.AsyncChunk{
chunk: "event:poke\ndata:{\"kittens\":3}\n\n",
id: #Reference<0.2736682462.4075814917.25838>
}
%HTTPoison.AsyncChunk{
chunk: "event:poke\ndata:{\"kittens\":4}\n\n",
id: #Reference<0.2736682462.4075814917.25838>
}
:ok
Run Code Online (Sandbox Code Playgroud)
但在现实世界中,您可能希望使用 GenServer 来处理每条消息、处理断开连接等,这本质上就是上面链接的两个库的用途。一个简单的客户端可能如下所示:
# Usage:
# > SseClient.start("https://some-domain.com/some-server-sent-event-stream/")
#
defmodule SseClient do
use GenServer
def start(url) do
GenServer.start_link(__MODULE__, url: url)
end
def init([url: url]) do
IO.puts "Connecting to stream..."
HTTPoison.get!(url, [], [recv_timeout: :infinity, stream_to: self()])
{:ok, nil}
end
def handle_info(%HTTPoison.AsyncChunk{chunk: chunk}, _state) do
# My use case assumes that each message contains two rows (event: and data:)
case Regex.run(~r/^event:(\w+)\ndata:({.+})\n\n$/, chunk) do
[_, event, data] ->
_json = Jason.decode!(data)
case event do
"poke" -> IO.puts "Poke received: #{data}"
"data" -> IO.puts "Data received: #{data}"
end
nil ->
raise "Don't know how to parse received chunk: \"#{chunk}\""
end
{:noreply, nil}
end
# In addition to message chunks, we also may receive status changes etc.
def handle_info(%HTTPoison.AsyncStatus{} = status, _state) do
IO.puts "Connection status: #{inspect status}"
{:noreply, nil}
end
def handle_info(%HTTPoison.AsyncHeaders{} = headers, _state) do
IO.puts "Connection headers: #{inspect headers}"
{:noreply, nil}
end
end
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1097 次 |
| 最近记录: |