Sve*_*enK 7 haskell nlp conduit
我写了一个计算语料库中NGrams频率的程序.我已经有一个功能消耗了一个令牌流并产生了一个单一订单的NGrams:
ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)
Run Code Online (Sandbox Code Playgroud)
目前我只能将一个流消费者连接到流源:
tokens --- trigrams --- countFreq
Run Code Online (Sandbox Code Playgroud)
如何将多个流使用者连接到同一个流源?我想要这样的东西:
.--- unigrams --- countFreq
|--- bigrams --- countFreq
tokens ----|--- trigrams --- countFreq
'--- ... --- countFreq
Run Code Online (Sandbox Code Playgroud)
一个优点是并行运行每个消费者
编辑: 感谢Petr,我提出了这个解决方案
spawnMultiple orders = do
chan <- atomically newBroadcastTMChan
results <- forM orders $ \_ -> newEmptyMVar
threads <- forM (zip results orders) $
forkIO . uncurry (sink chan)
forkIO . runResourceT $ sourceFile "test.txt"
$$ javascriptTokenizer
=$ sinkTMChan chan
forM results readMVar
where
sink chan result n = do
chan' <- atomically $ dupTMChan chan
freqs <- runResourceT $ sourceTMChan chan'
$$ ngram n
=$ frequencies
putMVar result freqs
Run Code Online (Sandbox Code Playgroud)
我假设您希望所有接收器都能接收所有值.
我建议:
newBroadcastTMChan
创建一个新的通道Control.Concurrent.STM.TMChan
(STM-CHANS).sinkTBMChan
from Data.Conduit.TMChan
(stm-conduit)为主生产者构建接收器.dupTMChan
创建自己的副本进行阅读.使用启动将读取此副本的新线程sourceTBMChan
.(我没试过,让我们知道它是如何工作的.)
更新:收集结果的一种方法是MVar
为每个消费者线程创建一个.它们中的每一个都会putMVar
在它结束后产生.你的主线程将takeMVar
在所有这些上MVar
,因此等待每个线程完成.例如,如果vars
是您的MVar
s 的列表,主线程将发出mapM takeMVar vars
以收集所有结果.