管道:多流消费者

Sve*_*enK 7 haskell nlp conduit

我写了一个计算语料库中NGrams频率的程序.我已经有一个功能消耗了一个令牌流并产生了一个单一订单的NGrams:

ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)
Run Code Online (Sandbox Code Playgroud)

目前我只能将一个流消费者连接到流源:

tokens --- trigrams --- countFreq
Run Code Online (Sandbox Code Playgroud)

如何将多个流使用者连接到同一个流源?我想要这样的东西:

           .--- unigrams --- countFreq
           |--- bigrams  --- countFreq
tokens ----|--- trigrams --- countFreq
           '--- ...      --- countFreq
Run Code Online (Sandbox Code Playgroud)

一个优点是并行运行每个消费者

编辑: 感谢Petr,我提出了这个解决方案

spawnMultiple orders = do
    chan <- atomically newBroadcastTMChan

    results <- forM orders $ \_ -> newEmptyMVar
    threads <- forM (zip results orders) $
                        forkIO . uncurry (sink chan)

    forkIO . runResourceT $ sourceFile "test.txt"
                         $$ javascriptTokenizer
                         =$ sinkTMChan chan

    forM results readMVar

    where
        sink chan result n = do
            chan' <- atomically $ dupTMChan chan
            freqs <- runResourceT $ sourceTMChan chan'
                                 $$ ngram n
                                 =$ frequencies
            putMVar result freqs
Run Code Online (Sandbox Code Playgroud)

Pet*_*lák 6

我假设您希望所有接收器都能接收所有值.

我建议:

  1. 使用newBroadcastTMChan创建一个新的通道Control.Concurrent.STM.TMChan(STM-CHANS).
  2. 使用此通道使用sinkTBMChanfrom Data.Conduit.TMChan(stm-conduit)为主生产者构建接收器.
  3. 为每个客户端使用dupTMChan创建自己的副本进行阅读.使用启动将读取此副本的新线程sourceTBMChan.
  4. 从线程中收集结果.
  5. 确保您的客户端能够在生成数据时尽快读取数据,否则可能会导致堆溢出.

(我没试过,让我们知道它是如何工作的.)


更新:收集结果的一种方法是MVar为每个消费者线程创建一个.它们中的每一个都会putMVar在它结束后产生.你的主线程将takeMVar在所有这些上MVar,因此等待每个线程完成.例如,如果vars是您的MVars 的列表,主线程将发出mapM takeMVar vars以收集所有结果.