用于将数据处理表示为管道的框架

Eri*_*ikR 16 haskell functional-programming pipeline pipe data-processing

大多数数据处理可以被设想为组件的流水线,一个组件的输出馈入另一个组件的输入.典型的处理管道是:

reader | handler | writer
Run Code Online (Sandbox Code Playgroud)

作为开始讨论的一个衬托,让我们考虑一个面向对象的实现这个管道,其中每个段都是一个对象.该handler对象包含对readerwriter对象的引用,并具有如下所示的run方法:

define handler.run:
  while (reader.has_next) {
    data = reader.next
    output = ...some function of data...
    writer.put(output)
  }
Run Code Online (Sandbox Code Playgroud)

示意性依赖关系是:

reader <- handler -> writer
Run Code Online (Sandbox Code Playgroud)

现在假设我想在读者和处理程序之间插入一个新的管道段:

reader | tweaker | handler | writer
Run Code Online (Sandbox Code Playgroud)

同样,在这个OO实现中,tweaker将是reader对象的包装器,并且tweaker方法可能看起来像(在一些伪命令性代码中):

define tweaker.has_next:
  return reader.has_next

define tweaker.next:
  value = reader.next
  result = ...some function of value...
  return result
Run Code Online (Sandbox Code Playgroud)

我发现这不是一个非常可组合的抽象.一些问题是:

  1. tweaker只能在左侧使用handler,即我不能使用上面的实现tweaker来形成这个管道:

    读者| 处理程序| tweaker | 作家

  2. 我想利用管道的关联属性,以便这个管道:

    读者| 处理程序| 作家

可以表达为:

reader | p
Run Code Online (Sandbox Code Playgroud)

p管道在哪里handler | writer.在这个OO实现中,我将不得不部分实例化该handler对象

  1. 稍微重述(1),对象必须知道它们是否"推"或"拉"数据.

我正在寻找一个框架(不一定是OO)来创建解决这些问题的数据处理管道.

我已经标记这Haskellfunctional programming因为我觉得函数式编程的概念在这里可能是有用的.

作为一个目标,能够创建这样的管道会很好:

                     handler1
                   /          \
reader | partition              writer
                   \          /
                     handler2
Run Code Online (Sandbox Code Playgroud)

从某些角度来看,Unix shell管道通过以下实现决策解决了很多这些问题:

  1. 管道组件在不同的进程中异步运行

  2. 管道对象调解"推动器"和"拉动器"之间的传递数据; 也就是说,它们会阻止编写数据过快的编写者和尝试阅读速度过快的读者.

  3. 您可以使用特殊的连接器<>无源器件(即文件)连接到管道

我对代理中不使用线程或消息传递的方法特别感兴趣.也许这是最好的方法,但我想尽可能避免线程化.

谢谢!

luq*_*qui 20

是的,几乎肯定是你的男人.

我怀疑你是Haskell的新手,只是基于你在问题中所说的各种事情.箭头可能看起来相当抽象,特别是如果你正在寻找的是一个"框架".我知道我花了一段时间才能真正了解箭头发生的事情.

因此,您可以查看该页面并说"是的,这看起来像我想要的",然后发现自己相当失去了如何开始使用箭头来解决问题.所以这里有一点指导,所以你知道你在看什么.

箭无法解决您的问题.相反,它们会为您提供一种语言,您可以使用该语言来表达您的问题.您可能会发现某些预定义的箭头可以完成这项任务 - 可能会有一些kleisli箭头 - 但是在一天结束时您将要实现一个箭头(预定义的箭头只是为您提供实现它们的简单方法),它们表达了你的意思是"数据处理器".作为一个几乎无足轻重的例子,假设您希望通过简单的函数实现数据处理器.你会写:

newtype Proc a b = Proc { unProc :: a -> b }

-- I believe Arrow has recently become a subclass of Category, so assuming that.

instance Category Proc where
    id = Proc (\x -> x)
    Proc f . Proc g = Proc (\x -> f (g x))

instance Arrow Proc where
    arr f = Proc f
    first (Proc f) = Proc (\(x,y) -> (f x, y))
Run Code Online (Sandbox Code Playgroud)

这使您可以使用各种箭头组合子的机械(***),(&&&),(>>>)等,以及箭头符号这是相当不错的,如果你正在做复杂的事情.因此,正如Daniel Fischer在评论中指出的那样,您在问题中描述的管道可以表示为:

reader >>> partition >>> (handler1 *** handler2) >>> writer
Run Code Online (Sandbox Code Playgroud)

但很酷的是,它取决于您对处理器的意义.使用不同的处理器类型,可以以类似的方式实现您提到的关于每个处理器分叉线程的内容:

newtype Proc' a b = Proc (Source a -> Sink b -> IO ())
Run Code Online (Sandbox Code Playgroud)

然后适当地实施组合器.

这就是你所看到的:一个用于讨论组合过程的词汇表,它有一些代码可以重用,但主要是帮助指导你的思考,因为你实现这些组合器来定义在你的域中有用的处理器.

我最初的一个重要的Haskell项目之一是实现量子纠缠箭头 ; 那个项目让我真正开始理解Haskell的思维方式,这是我编程生涯中的一个重要转折点.也许你的这个项目会为你做同样的事情吗?:-)


Hei*_*mus 7

由于延迟评估,我们可以用Haskell中的普通函数组合来表达管道.这是一个计算文件中行的最大长度的示例:

main = interact (show . maximum . map length . lines)
Run Code Online (Sandbox Code Playgroud)

这里的所有东西都是普通的功能,比如说

lines :: String -> [String]
Run Code Online (Sandbox Code Playgroud)

但是由于懒惰的评估,这些函数只是递增地处理输入而且只需要处理所需的内容,就像UNIX管道一样.