服务器将数据从Clojure推送到ClojureScript

mik*_*era 10 web-services clojure push-notification clojurescript core.async

我正在Clojure中编写一个应用服务器,它将在客户端上使用ClojureScript.

我想找到一种有效的,惯用的方式将数据从服务器推送到客户端作为实时事件,理想情况下使用以下组合:

  • HTTP的试剂盒
  • core.async

(但我对其他可能性持开放态度)

有人能提供一个很好的例子/方法吗?

llj*_*098 5

我更喜欢使用aleph,这里是wiki,你可以简单地使用wrap-ring-handler函数来包装现有的处理程序.

对于'push'函数,最有用的部分是aleph的异步处理程序.它建立在netty之上,而不是一个连接一个线程模型,因此服务器端不需要担心tcp连接数.

一些实现细节:

  • 服务器端使用aysnc处理程序,保存所有客户端连接(通道)
  • 在60(例如)秒内,如果没有'新数据',则发送空响应
  • 如果服务器端有响应,请发送它.
  • 客户端可以简单地向服务器发送正常的http请求
  • 当客户端获得响应时,处理响应主体,然后再次重新发送http请求
  • 请检查客户端和所有代理服务器以设置正确的超时值

这里有更多方法:http://en.wikipedia.org/wiki/Push_technology


Dan*_*eal 5

我最近一直在尝试图书馆Chord,我真的很喜欢它.

它在http-kit中围绕Websocket支持提供了一个小的core.async包装器.

从github页面:

在服务器上

(:require [chord.http-kit :refer [with-channel]]
          [clojure.core.async :refer [<! >! put! close! go]])

(defn your-handler [req]
  (with-channel req ws-ch
    (go
      (let [{:keys [message]} (<! ws-ch)]
        (println "Message received:" message)
        (>! ws-ch "Hello client from server!")
        (close! ws-ch)))))
Run Code Online (Sandbox Code Playgroud)

在客户端上

(:require [chord.client :refer [ws-ch]]
          [cljs.core.async :refer [<! >! put! close!]])
(:require-macros [cljs.core.async.macros :refer [go]])

(go
  (let [ws (<! (ws-ch "ws://localhost:3000/ws"))]
    (>! ws "Hello server from client!")))
Run Code Online (Sandbox Code Playgroud)

我认为它仍处于早期阶段 - 它还没有解决断线问题.


小智 4

我现在开发了一个项目,我有完全相同的要求。我结合使用基础服务core.async来实施 SSE,效果非常好。

不幸的是,我现在无法开源这项工作,但基本上,我做了类似下面的代码片段的事情,只是由于身份验证而变得更加复杂。(从浏览器在 SSE 中进行身份验证并不是特别容易,因为您无法在new EventSource(SOME_URI);调用中传递任何自定义标头。

所以片段:

(ns chat-service.service
  (:require [clojure.set :as set]
            [clojure.core.async :as async :refer [<!! >!! <! >!]]
            [cheshire.core :as json]
            [io.pedestal.service.http :as bootstrap]
            [io.pedestal.service.log :as log]
            [io.pedestal.service.http.route :as route]
            [io.pedestal.service.http.sse :as sse]
            [io.pedestal.service.http.route.definition :refer [defroutes]]))

(def ^{:private true :doc "Formatting opts"} json-opts {:date-format "MMM dd, yyyy HH:mm:ss Z"})

(def ^{:private true :doc "Users to notification channels"} subscribers->notifications (atom {}))

 ;; private helper functions
(def ^:private generate-id #(.toString (java.util.UUID/randomUUID)))

(defn- sse-msg [event msg-data]
  {:event event :msg msg-data})

;; service functions    
(defn- remove-subscriber
  "Removes transport channel from atom subscribers->notifications and tears down
   SSE connection."
  [transport-channel context]
  (let [subscriber (get (set/map-invert @subscribers->notifications) transport-channel)]
    (log/info :msg (str "Removing SSE connection for subscriber with ID : " subscriber))
    (swap! subscribers->notifications dissoc subscriber)
    (sse/end-event-stream context)))

(defn send-event
  "Sends updates via SSE connection, takes also transport channel to close it
   in case of the exception."
  [transport-channel context {:keys [event msg]}]
  (try
    (log/info :msg "calling event sending fn")
    (sse/send-event context event (json/generate-string msg json-opts))
    (catch java.io.IOException ioe
      (async/close! transport-channel))))

(defn create-transport-channel
  "Creates transport channel with receiving end pushing updates to SSE connection.
   Associates this transport channel in atom subscribers->notifications under random
   generated UUID."
  [context]
  (let [temporary-id (generate-id)
        channel (async/chan)]
    (swap! subscribers->notifications assoc temporary-id channel)
    (async/go-loop []
      (when-let [payload (<! channel)]
        (send-event channel context payload)
        (recur))
      (remove-subscriber channel context))
    (async/put! channel (sse-msg "eventsourceVerification"
                                 {:handshakeToken temporary-id}))))

(defn subscribe
  "Subscribes anonymous user to SSE connection. Transport channel with timeout set up
   will be created for pushing any new data to this connection."
  [context]
  (create-transport-channel context))

(defroutes routes
  [[["/notifications/chat"
     {:get [::subscribe (sse/start-event-stream subscribe)]}]]])

(def service {:env :prod
              ::bootstrap/routes routes
              ::bootstrap/resource-path "/public"
              ::bootstrap/type :jetty
              ::bootstrap/port 8081})
Run Code Online (Sandbox Code Playgroud)

我遇到的一个“问题”是基座处理断开的 SSE 连接的默认方式。

由于计划的心跳作业,每当连接断开并且您没有调用 end-event-stream 上下文时,它都会记录异常。

我希望有一种方法可以禁用/调整此行为,或者至少提供我自己的拆卸函数,每当心跳作业因 EofException 失败时就会调用该函数。