Mar*_*ema 9 clojure core.async
我有一个clojure处理应用程序,它是一个渠道管道.每个处理步骤都异步进行计算(即使用http-kit或其他东西发出http请求),并将结果放在输出通道上.这样,下一步可以从该通道读取并进行计算.
我的主要功能看起来像这样
(defn -main [args]
(-> file/tmp-dir
(schedule/scheduler)
(search/searcher)
(process/resultprocessor)
(buy/buyer)
(report/reporter)))
Run Code Online (Sandbox Code Playgroud)
目前,调度程序步骤驱动管道(它没有输入通道),并为链提供工作负载.
当我在REPL中运行它时:
(-main "some args")
Run Code Online (Sandbox Code Playgroud)
由于调度程序无穷大,它基本上可以永久运行.改变这种架构的最佳方法是什么,以便我可以从REPL关闭整个系统?关闭每个通道是否意味着系统终止?
某些广播频道有帮助吗?
您可以在kill通道和管道的输入通道上使用scheduler alts!
/ alts!!
:
(def kill-channel (async/chan))
(defn scheduler [input output-ch kill-ch]
(loop []
(let [[v p] (async/alts!! [kill-ch [out-ch (preprocess input)]]
:priority true)]
(if-not (= p kill-ch)
(recur))))
Run Code Online (Sandbox Code Playgroud)
设置值kill-channel
将终止循环.
从技术上讲,你也可以output-ch
用来控制进程(放入关闭的通道返回false
),但我通常会发现显式的kill通道更清洁,至少对于顶级管道.
为了使事物更加优雅和方便使用(在REPL和生产中),您可以使用Stuart Sierra的组件,assoc
在组件的start
方法中启动调度程序循环(在单独的线程上)和组件上的kill通道然后close!
在组件的stop
方法中杀死通道(从而终止循环).
我建议使用类似https://github.com/stuartsierra/component 的东西来处理系统设置。它确保您可以轻松地在 REPL 中启动和停止系统。使用该库,您可以对其进行设置,以便每个处理步骤都是一个组件,每个组件将处理其start
和stop
协议中通道的设置和拆卸。此外,您可能会IStream
为每个要实现的组件创建一个协议,并使每个组件依赖于实现该协议的组件。它为您提供了一些非常简单的模块化。
您最终会得到一个如下所示的系统:
(component/system-map
:scheduler (schedule/new-scheduler file/tmp-dir)
:searcher (component/using (search/searcher)
{:in :scheduler})
:processor (component/using (process/resultprocessor)
{:in :searcher})
:buyer (component/using (buy/buyer)
{:in :processor})
:report (component/using (report/reporter)
{:in :buyer}))
Run Code Online (Sandbox Code Playgroud)
这种方法的一个好处是,如果组件也依赖于通道,您可以轻松添加它们。例如,如果每个组件都使用tap
内部的a 创建其输出通道mult
,则您可以仅通过将处理器作为依赖项的日志记录组件为处理器添加一个记录器。
:processor (component/using (process/resultprocessor)
{:in :searcher})
:processor-logger (component/using (log/logger)
{:in processor})
Run Code Online (Sandbox Code Playgroud)
我建议您也观看他的演讲,以了解其工作原理。