rxjava和clojure异步神秘:期货承诺和代理人,哦,我的

Reb*_*bin 8 clojure netflix reactive-programming rx-java

我提前为本说明的长度道歉.我花了相当多的时间把它做得更短,而且这个小到我能得到它.

我有一个谜,非常感谢你的帮助.这个谜团来自于observer我在Clojure中写的一个rxjava的行为,这个rxjava来自于几个简单observable的在线样本.

一个observable同步onNext向其观察者的处理程序发送消息,我所谓的原则观察者表现得如预期的那样.

另一个observable异步在另一个线程上通过Clojure执行相同的操作future.完全相同的观察者不会捕获发布到其上的所有事件onNext; 它似乎在尾部丢失了随机数量的消息.

在等待promised onCompleted的到期和等待发送给agent收集器的所有事件的到期之间存在有意的争用.如果promise获胜,我希望看到falseonCompleted,并在一个很短可能队列agent.如果agent获胜,我希望看到trueonCompleted,所有的消息来自agent的队列.我不希望的一个结果是true来自onCompletedAND的短队列agent.但是,墨菲没有睡觉,而这正是我所看到的.我不知道垃圾收集是否有问题,或者是否有一些内部排队到Clojure的STM,或者我的愚蠢,或者完全不同的东西.

我在这里以自包含形式的顺序呈现源代码,以便可以直接运行lein repl.有三个cermonials可以解决问题:首先,leiningen项目文件,project.clj声明依赖于0.9.0Netflix的rxjava版本:

(defproject expt2 "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure               "1.5.1"]
                 [com.netflix.rxjava/rxjava-clojure "0.9.0"]]
  :main expt2.core)
Run Code Online (Sandbox Code Playgroud)

现在,命名空间和Clojure要求以及Java导入:

(ns expt2.core
  (:require clojure.pprint)
  (:refer-clojure :exclude [distinct])
  (:import [rx Observable subscriptions.Subscriptions]))
Run Code Online (Sandbox Code Playgroud)

最后,输出到控制台的宏:

(defmacro pdump [x]
  `(let [x# ~x]
     (do (println "----------------")
         (clojure.pprint/pprint '~x)
         (println "~~>")
         (clojure.pprint/pprint x#)
         (println "----------------")
         x#)))
Run Code Online (Sandbox Code Playgroud)

最后,给我的观察者.我使用an agent来收集任何可观察者发送的消息onNext.我用a atom来收集潜力onError.我使用的是promiseonCompleted让消费者外部观察者可以等待就可以了.

(defn- subscribe-collectors [obl]
  (let [;; Keep a sequence of all values sent:
        onNextCollector      (agent [])
        ;; Only need one value if the observable errors out:
        onErrorCollector     (atom nil)
        ;; Use a promise for 'completed' so we can wait for it on
        ;; another thread:
        onCompletedCollector (promise)]
    (letfn [;; When observable sends a value, relay it to our agent"
            (collect-next      [item] (send onNextCollector (fn [state] (conj state item))))
            ;; If observable errors out, just set our exception;
            (collect-error     [excp] (reset!  onErrorCollector     excp))
            ;; When observable completes, deliver on the promise:
            (collect-completed [    ] (deliver onCompletedCollector true))
            ;; In all cases, report out the back end with this:
            (report-collectors [    ]
              (pdump
               ;; Wait for everything that has been sent to the agent
               ;; to drain (presumably internal message queues):
               {:onNext      (do (await-for 1000 onNextCollector)
                                 ;; Then produce the results:
                                 @onNextCollector)
                ;; If we ever saw an error, here it is:
                :onError     @onErrorCollector
                ;; Wait at most 1 second for the promise to complete;
                ;; if it does not complete, then produce 'false'.
                ;; I expect if this times out before the agent
                ;; times out to see an 'onCompleted' of 'false'.
                :onCompleted (deref onCompletedCollector 1000 false)
                }))]
      ;; Recognize that the observable 'obl' may run on another thread:
      (-> obl
          (.subscribe collect-next collect-error collect-completed))
      ;; Therefore, produce results that wait, with timeouts, on both
      ;; the completion event and on the draining of the (presumed)
      ;; message queue to the agent.
      (report-collectors))))
Run Code Online (Sandbox Code Playgroud)

现在,这是一个同步可观察的.它向onNext观察者的喉咙发出25条信息,然后打电话给他们onCompleted.

(defn- customObservableBlocking []
  (Observable/create
    (fn [observer]                       ; This is the 'subscribe' method.
      ;; Send 25 strings to the observer's onNext:
      (doseq [x (range 25)]
        (-> observer (.onNext (str "SynchedValue_" x))))
      ; After sending all values, complete the sequence:
      (-> observer .onCompleted)
      ; return a NoOpSubsription since this blocks and thus
      ; can't be unsubscribed (disposed):
      (Subscriptions/empty))))
Run Code Online (Sandbox Code Playgroud)

我们订阅了这个观察者的观察者:

;;; The value of the following is the list of all 25 events:
(-> (customObservableBlocking)
    (subscribe-collectors))
Run Code Online (Sandbox Code Playgroud)

它按预期工作,我们在控制台上看到以下结果

{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
 :onError @onErrorCollector,
 :onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
 ["SynchedValue_0"
  "SynchedValue_1"
  "SynchedValue_2"
  "SynchedValue_3"
  "SynchedValue_4"
  "SynchedValue_5"
  "SynchedValue_6"
  "SynchedValue_7"
  "SynchedValue_8"
  "SynchedValue_9"
  "SynchedValue_10"
  "SynchedValue_11"
  "SynchedValue_12"
  "SynchedValue_13"
  "SynchedValue_14"
  "SynchedValue_15"
  "SynchedValue_16"
  "SynchedValue_17"
  "SynchedValue_18"
  "SynchedValue_19"
  "SynchedValue_20"
  "SynchedValue_21"
  "SynchedValue_22"
  "SynchedValue_23"
  "SynchedValue_24"],
 :onError nil,
 :onCompleted true}
----------------
Run Code Online (Sandbox Code Playgroud)

这是一个异步的observable,完全相同的东西,只在一个future线程上:

(defn- customObservableNonBlocking []
  (Observable/create
    (fn [observer]                       ; This is the 'subscribe' method
      (let [f (future
                ;; On another thread, send 25 strings:
                (doseq [x (range 25)]
                  (-> observer (.onNext (str "AsynchValue_" x))))
                ; After sending all values, complete the sequence:
                (-> observer .onCompleted))]
        ; Return a disposable (unsubscribe) that cancels the future:
        (Subscriptions/create #(future-cancel f))))))

;;; For unknown reasons, the following does not produce all 25 events:
(-> (customObservableNonBlocking)
    (subscribe-collectors))
Run Code Online (Sandbox Code Playgroud)

但是,令人惊讶的是,这是我们在控制台上看到的内容:true因为onCompleted暗示promiseDID NOT TIME-OUT; 但只有一些异步消息.我们看到的实际消息数量因运行而异,这意味着存在一些并发现象.线索赞赏.

----------------
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
 :onError @onErrorCollector,
 :onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
 ["AsynchValue_0"
  "AsynchValue_1"
  "AsynchValue_2"
  "AsynchValue_3"
  "AsynchValue_4"
  "AsynchValue_5"
  "AsynchValue_6"],
 :onError nil,
 :onCompleted true}
----------------
Run Code Online (Sandbox Code Playgroud)

Ank*_*kur 7

await-for对代理机构阻止当前线程,直到所有行动派出迄今为止(从这个线程或代理)已经发生的代理商,这意味着它可能出现后您的await结束还有一些其他的线程,可以发送邮件到代理人,这就是你案件中发生的事情.在你的await代理已经结束并且你已经:onNext在地图中的密钥中对其值进行deref 之后,那么你等待完成的承诺,在等待之后结果是真的,但同时将一些其他消息发送给代理被收集到矢量中.

您可以通过将:onCompleted密钥作为映射中的第一个密钥来解决此问题,这基本上意味着等待完成,然后等待send代理程序,因为在已经收到onCompleted之后,代理程序上不再有调用.

{:onCompleted (deref onCompletedCollector 1000 false)
 :onNext      (do (await-for 0 onNextCollector)
                                 @onNextCollector)
 :onError     @onErrorCollector
 }
Run Code Online (Sandbox Code Playgroud)