core.async 与分区状态传感器不保持状态?

joe*_*mct 2 clojure core.async

(我之前有一个问题在这里,并假设我不会遇到 core.async 问题)

给定如下输入数据:

(require '[clojure.core.async :as a])

(def input-data
  [{:itm_na 1 :seq_no 1  :doc_img "this is a very long "}
   {:itm_na 1 :seq_no 2  :doc_img "sentence from a mainframe "}
   {:itm_na 1 :seq_no 3  :doc_img "system that was built before i was "}
   {:itm_na 1 :seq_no 4  :doc_img "born."}
   {:itm_na 2 :seq_no 1  :doc_img "this is a another very long "}
   {:itm_na 2 :seq_no 2  :doc_img "sentence from the same mainframe "}
   {:itm_na 3 :seq_no 1  :doc_img "Ok here we are again. "}
   {:itm_na 3 :seq_no 2  :doc_img "The mainframe only had 40 char per field so"}
   {:itm_na 3 :seq_no 3  :doc_img "they broke it into multiple rows "}
   {:itm_na 3 :seq_no 4  :doc_img "which seems to be common"}
   {:itm_na 3 :seq_no 5  :doc_img " for the time. "}
   {:itm_na 3 :seq_no 6  :doc_img "thanks for your help."}])
Run Code Online (Sandbox Code Playgroud)

partition-by(如预期)将我的数据集中到 seq 中(以便稍后折叠):

(count (partition-by :itm_na input-data ))
;;=> 3
Run Code Online (Sandbox Code Playgroud)

但是,当我出于某种原因尝试使用管道执行此操作时,core.async它似乎没有做同样的事情...在异步管道中时,如何获得实际保留状态的 有状态传感器部分?partition-by

(let
    [source-chan (a/to-chan input-data)
     target-chan (a/chan 100)
     xf (comp (partition-by :itm_na))
     ]
  (a/pipeline 1
              target-chan
              xf
              source-chan)
  (count (<!! (a/into [] target-chan))))

;;=>12
Run Code Online (Sandbox Code Playgroud)

这应该是3?

奇怪的是,当我将其绑定xf到如下所示的通道时,我得到了预期的结果。我不确定为什么a/pipeline行为不同。

(let [xf (comp (partition-by :itm_na))
      ch (a/chan 1 xf)]
  (a/onto-chan ch input-data)
  (count (<!! (a/into [] ch))))
=>3
Run Code Online (Sandbox Code Playgroud)

从文档中...提到了有状态的位:

(clojure.repl/doc partition-by) 
-------------------------
clojure.core/partition-by
([f] [f coll])
  Applies f to each value in coll, splitting it each time f returns a
   new value.  Returns a lazy seq of partitions.  Returns a stateful
   transducer when no collection is provided.
Run Code Online (Sandbox Code Playgroud)

Ole*_*Cat 5

Rich Hickey 在他的演讲中简要强调了这个特殊情况:你不能使用pipeline有状态的转换器,主要是因为pipeline的并行性质。