Sve*_*enK 7 haskell nlp conduit
我写了一个计算语料库中NGrams频率的程序.我已经有一个功能消耗了一个令牌流并产生了一个单一订单的NGrams:
ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)
目前我只能将一个流消费者连接到流源:
tokens --- trigrams --- countFreq
如何将多个流使用者连接到同一个流源?我想要这样的东西:
           .--- unigrams --- countFreq
           |--- bigrams  --- countFreq
tokens ----|--- trigrams --- countFreq
           '--- ...      --- countFreq
一个优点是并行运行每个消费者
编辑: 感谢Petr,我提出了这个解决方案
spawnMultiple orders = do
    chan <- atomically newBroadcastTMChan
    results <- forM orders $ \_ -> newEmptyMVar
    threads <- forM (zip results orders) $
                        forkIO . uncurry (sink chan)
    forkIO . runResourceT $ sourceFile "test.txt"
                         $$ javascriptTokenizer
                         =$ sinkTMChan chan
    forM results readMVar
    where
        sink chan result n = do
            chan' <- atomically $ dupTMChan chan
            freqs <- runResourceT $ sourceTMChan chan'
                                 $$ ngram n
                                 =$ frequencies
            putMVar result freqs
我假设您希望所有接收器都能接收所有值.
我建议:
newBroadcastTMChan创建一个新的通道Control.Concurrent.STM.TMChan(STM-CHANS).sinkTBMChanfrom Data.Conduit.TMChan(stm-conduit)为主生产者构建接收器.dupTMChan创建自己的副本进行阅读.使用启动将读取此副本的新线程sourceTBMChan.(我没试过,让我们知道它是如何工作的.)
更新:收集结果的一种方法是MVar为每个消费者线程创建一个.它们中的每一个都会putMVar在它结束后产生.你的主线程将takeMVar在所有这些上MVar,因此等待每个线程完成.例如,如果vars是您的MVars 的列表,主线程将发出mapM takeMVar vars以收集所有结果.
| 归档时间: | 
 | 
| 查看次数: | 497 次 | 
| 最近记录: |