我正在寻找一种惯用的方法来做到以下几点.我有一个http服务器,在特定的GET请求上响应消息流.现在,由于此消息是非终止的,当我使用clj-http/get时,调用会永远阻塞(我正在使用LightTable).我想建立一个回调或core.async样式的通道来对消息进行一些操作.即使将流写入文件对我来说也是一个很好的第一步.有什么指针吗?这是电话:
(require '[clj-http.client :as client])
(def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")
(client/get url)
Run Code Online (Sandbox Code Playgroud)
日期必须更改为今天的数据流日期.谢谢!
要将流写入文件,一种简单的方法是使用clojure.java.io/copy(它采用输入流,例如返回(:body (client/get some-url {:as :stream}))的输入流和输出流以及从一个到另一个的副本).就像是
(ns http-stream
(:require [clj-http.client :as client]
[clojure.java.io :as io]))
(with-open [in-stream (:body (client/get "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt" {:as :stream}))
out-stream (->> "streamoutput.txt"
io/as-file
io/output-stream)]
(io/copy in-stream out-stream))
Run Code Online (Sandbox Code Playgroud)
这让我在几秒钟内获得了数千行标签分隔值.现在,要在行级别使用core.async处理它们,我们可能希望使用a reader和a 处理流更多一点line-seq:
(ns http-stream
(:require [clj-http.client :as client]
[clojure.core.async :as async]
[clojure.java.io :as io]
[clojure.string :as str]))
(defn trades-chan
"Open the URL as a stream of trades information. Return a channel of the trades, represented as strings."
[dump-url]
(let[lines (-> dump-url
(client/get {:as :stream})
:body
io/reader
line-seq) ];;A lazy seq of each line in the stream.
(async/to-chan lines))) ;;Return a channel which outputs the lines
;;Example: Print the first 250 lines.
(let [a (trades-chan "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")]
(async/go-loop [takes 250]
(when (< 0 takes)
(println (async/<! a))
(recur (dec takes)))))
Run Code Online (Sandbox Code Playgroud)
现在,有了这个,你很大程度上已经启动了,但是我注意到流总是以对列的描述开始
time price quantity board source buyer seller initiator
Run Code Online (Sandbox Code Playgroud)
你可以用它作为改善一点点的机会.特别是,这足以为交易建立一个传感器的信息,可以将交易变成更方便的格式,如地图.此外,我们可能想要一种方法来停止使用元素并在某个时候关闭连接.我自己并不熟悉core.async,但这似乎有效:
(defn trades-chan
"Open the URL as a tab-separated values stream of trades.
Returns a core.async channel of the trades, represented as maps.
Closes the HTTP stream on channel close!"
[dump-url]
(let[stream (-> dump-url
(client/get {:as :stream})
:body)
lines (-> stream
io/reader
line-seq) ;;A lazy seq of each line in the stream.
fields (map keyword (str/split (first lines) #"\t")) ;; (:time :price :quantity ...
transducer (map (comp #(zipmap fields %) #(str/split % #"\t"))) ;;A transducer that splits strings on tab and makes them into maps with keys from fields
output-chan (async/chan 50 transducer)]
(async/go-loop [my-lines (drop 1 lines)]
(if (async/>! output-chan (first my-lines)) ;;If we managed to put
(recur (rest my-lines)) ;;then the chan is not closed. Recur with the rest of the lines.
(.close stream))) ;;else close the HTTP stream.
output-chan))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1226 次 |
| 最近记录: |