使用clojure core.async管道处理错误

Mic*_*ini 6 error-handling pipeline clojure core.async

我试图了解使用core.async/pipeline处理错误的正确方法是什么,我的管道如下:

input     --> xf-run-computation --> first-out
first-out --> xf-run-computation --> last-out
Run Code Online (Sandbox Code Playgroud)

xf-run-computationhttp会在哪里进行调用并返回响应.但是,其中一些响应将返回错误.处理这些错误的最佳方法是什么?我的解决办法是在分裂输出通道success-valueserror-values,然后将它们合并回一个信道:

(let [[success-values1 error-values1] (split fn-to-split first-out)
      [success-values2 error-values2] (split fn-to-split last-out)
      errors (merge [error-values1 error-values2])]
(pipeline 4 first-out xf-run-computation input)
(pipeline 4 last-out  xf-run-computation success-values1)
[last-out errors])
Run Code Online (Sandbox Code Playgroud)

所以我的函数将返回最后的结果和错误.

sch*_*eho 6

一般来说,"正确"的方法可能取决于您的应用需求,但考虑到您的问题描述,我认为您需要考虑三件事:

  1. xf-run-computation 返回业务逻辑将看作错误的数据,
  2. xf-run-computation 抛出异常并且
  3. 鉴于涉及http调用,一些xf-run-computation可能永远不会完成(或没有及时完成).

关于第3点,你应该考虑的第一件事是使用pipeline-blocking而不是pipeline.

我认为你的问题主要与第1点有关.基本思想是xf-run-computation需要返回数据结构(例如地图或记录)的结果,这清楚地将结果标记为错误或成功,例如{:title nil :body nil :status "error"}.这将为您提供处理这种情况的一些选择:

  • 所有后来的代码都忽略了输入数据:status "error".即,你xf-run-computation会包含一条线(when (not (= (:status input) "error")) (run-computation input)),

  • 您可以根据需要对pipeline-calls和filter它们之间的所有结果运行过滤器(注意,filter它也可以用作管道中的传感器,从而消除core.async 的旧功能filter>filter<功能),

  • 你按照async/split你的建议使用/ Alan Thompson在他的回答中显示将错误值过滤到一个单独的错误通道.如果您要合并值,则无需为第二个管道设置第二个错误通道,您只需重新使用错误通道即可.

对于第2点,问题是任何异常xf-run-computation都发生在另一个线程中,并且不会简单地传播回您的调用代码.但是你可以使用(和)的ex-handler参数.您可以简单地过滤掉所有异常,将结果放在单独的异常通道上或尝试捕获它们并将它们转换为错误(可能将它们放回结果或其他错误通道) - 后者只有在exception为您提供了足够的信息,例如id或允许将异常绑定到导致异常的输入的东西.你可以安排这个(即从第三方库中引发的任何异常,如http调用).pipelinepipeline-blockingxf-run-computationcatch

对于第3点,core.async中的规范答案将指向一个timeout频道,但这与之相关并没有太大意义pipeline.一个更好的想法是,以确保您的HTTP调用超时设置,例如:timeoutHTTP-KIT的选项或:socket-timeout:conn-timeoutCLJ-HTTP的.请注意,这些选项通常会导致超时异常.