在Repa数组上的并行mapM

bga*_*ari 87 arrays parallel-processing haskell repa

在我最近的工作Gibbs sampling,我一直在充分利用RVar它,在我看来,它为随机数生成提供了一个近乎理想的界面.遗憾的是,由于无法在地图中使用monadic动作,我无法使用Repa.

虽然一般来说明显的monadic地图不能并行化,但在我看来,这RVar可能至少是monad的一个例子,其中效果可以安全地并行化(至少在原则上;我对内部工作原理并不十分熟悉RVar) .也就是说,我想写下面的内容,

drawClass :: Sample -> RVar Class
drawClass = ...

drawClasses :: Array U DIM1 Sample -> RVar (Array U DIM1 Class)
drawClasses samples = A.mapM drawClass samples
Run Code Online (Sandbox Code Playgroud)

这里A.mapM看起来是这样的,

mapM :: ParallelMonad m => (a -> m b) -> Array r sh a -> m (Array r sh b)
Run Code Online (Sandbox Code Playgroud)

虽然这显然是如何工作的,但关键取决于其实现RVar及其基础RandomSource,原则上人们会认为这将涉及为每个产生的线程绘制一个新的随机种子并照常进行.

直觉上,似乎同样的想法可能会推广到其他一些monad.

所以,我的问题是:是否可以构建一个ParallelMonadmonad 类,其效果可以安全地并行化(可能至少有人居住RVar)?

它看起来像什么?还有哪些monad可能会在这个课程中出现?还有其他人考虑过如何在维修中使用它的可能性吗?

最后,如果并行monadic动作的这个概念不能概括,那么有没有人看到任何好的方法来使这个工作在特定的情况下RVar(它将非常有用)?放弃RVar并行是一个非常困难的权衡.

leh*_*ins 8

问这个问题已经7年了,似乎仍然没有人想出一个很好的解决方案。Repa 没有类似mapM/ 的traverse功能,甚至可以在没有并行化的情况下运行。此外,考虑到过去几年取得的进展,它似乎也不太可能发生。

由于 Haskell 中许多数组库的陈旧状态以及我对它们的功能集的总体不满,我将几年的工作投入到数组库中massiv,该库借鉴了 Repa 的一些概念,但将其提升到了一个完全不同的水平。介绍就够了。

在今天之前,有 3 个类似 monadic map 的函数massiv(不包括类似函数的同义词:imapMforM等):

  • mapM- 任意的通常映射Monad。由于显而易见的原因不可并行化,而且速度也有点慢(与mapM列表中的通常情况一样)
  • traversePrim- 这里我们仅限于PrimMonad,它比 快得多mapM,但其原因对于本次讨论并不重要。
  • mapIO- 这个,顾名思义,仅限于IO(或者更确切地说MonadUnliftIO,但这无关紧要)。因为我们在其中,IO我们可以自动将数组拆分为与核心数量一样多的块,并使用单独的工作线程将操作映射到IO这些块中的每个元素上。与fmap也是可并行化的pure 不同,IO由于调度的不确定性以及映射操作的副作用,我们必须在这里。

所以,一旦我读到这个问题,我就想这个问题实际上已经解决了massiv,但没有那么快。随机数生成器,例如 inmwc-random和其他 inrandom-fu不能在多个线程中使用相同的生成器。这意味着,我唯一缺少的拼图是:“为每个产生的线程绘制一个新的随机种子并照常进行”。换句话说,我需要两件事:

  • 一个可以初始化与工作线程数量一样多的生成器的函数
  • 以及一个抽象,它将根据操作正在运行的线程无缝地为映射函数提供正确的生成器。

所以这正是我所做的。

首先,我将使用特制的randomArrayWSinitWorkerStates函数给出示例,因为它们与问题更相关,然后转向更通用的 monadic 映射。以下是它们的类型签名:

randomArrayWS ::
     (Mutable r ix e, MonadUnliftIO m, PrimMonad m)
  => WorkerStates g -- ^ Use `initWorkerStates` to initialize you per thread generators
  -> Sz ix -- ^ Resulting size of the array
  -> (g -> m e) -- ^ Generate the value using the per thread generator.
  -> m (Array r ix e)
Run Code Online (Sandbox Code Playgroud)
initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m s) -> m (WorkerStates s)
Run Code Online (Sandbox Code Playgroud)

对于那些不熟悉 的人massivComp参数是一种要使用的计算策略,值得注意的构造函数是:

  • Seq - 按顺序运行计算,不分叉任何线程
  • Par - 启动尽可能多的线程,并使用这些线程来完成工作。

mwc-random最初将使用包作为示例,然后移至RVarT

?> import Data.Massiv.Array
?> import System.Random.MWC (createSystemRandom, uniformR)
?> import System.Random.MWC.Distributions (standard)
?> gens <- initWorkerStates Par (\_ -> createSystemRandom)
Run Code Online (Sandbox Code Playgroud)

上面我们使用系统随机性为每个线程初始化了一个单独的生成器,但是我们也可以通过从WorkerId参数中派生出每个线程的种子来使用唯一的生成器,该参数仅仅Int是工作人员的索引。现在我们可以使用这些生成器来创建一个具有随机值的数组:

?> randomArrayWS gens (Sz2 2 3) standard :: IO (Array P Ix2 Double)
Array P Par (Sz (2 :. 3))
  [ [ -0.9066144845415213, 0.5264323240310042, -1.320943607597422 ]
  , [ -0.6837929005619592, -0.3041255565826211, 6.53353089112833e-2 ]
  ]
Run Code Online (Sandbox Code Playgroud)

通过使用Par策略,scheduler库将在可用工作人员之间平均分配生成工作,每个工作人员将使用自己的生成器,从而使其线程安全。没有什么可以阻止我们重复使用相同的WorkerStates任意次数,只要不是并发完成,否则会导致异常:

?> randomArrayWS gens (Sz1 10) (uniformR (0, 9)) :: IO (Array P Ix1 Int)
Array P Par (Sz1 10)
  [ 3, 6, 1, 2, 1, 7, 6, 0, 8, 8 ]
Run Code Online (Sandbox Code Playgroud)

现在放在mwc-random一边,我们可以通过使用以下函数将相同的概念重用于其他可能的用例generateArrayWS

generateArrayWS ::
     (Mutable r ix e, MonadUnliftIO m, PrimMonad m)
  => WorkerStates s
  -> Sz ix --  ^ size of new array
  -> (ix -> s -> m e) -- ^ element generating action
  -> m (Array r ix e)
Run Code Online (Sandbox Code Playgroud)

mapWS

mapWS ::
     (Source r' ix a, Mutable r ix b, MonadUnliftIO m, PrimMonad m)
  => WorkerStates s
  -> (a -> s -> m b) -- ^ Mapping action
  -> Array r' ix a -- ^ Source array
  -> m (Array r ix b)
Run Code Online (Sandbox Code Playgroud)

以下是关于如何将此功能与rvar,random-fumersenne-random-pure64库一起使用的承诺示例。我们也可以在randomArrayWS这里使用,但为了举例,假设我们已经有一个包含不同RVarTs的数组,在这种情况下我们需要一个mapWS

?> import Data.Massiv.Array
?> import Control.Scheduler (WorkerId(..), initWorkerStates)
?> import Data.IORef
?> import System.Random.Mersenne.Pure64 as MT
?> import Data.RVar as RVar
?> import Data.Random as Fu
?> rvarArray = makeArrayR D Par (Sz2 3 9) (\ (i :. j) -> Fu.uniformT i j)
?> mtState <- initWorkerStates Par (newIORef . MT.pureMT . fromIntegral . getWorkerId)
?> mapWS mtState RVar.runRVarT rvarArray :: IO (Array P Ix2 Int)
Array P Par (Sz (3 :. 9))
  [ [ 0, 1, 2, 2, 2, 4, 5, 0, 3 ]
  , [ 1, 1, 1, 2, 3, 2, 6, 6, 2 ]
  , [ 0, 1, 2, 3, 4, 4, 6, 7, 7 ]
  ]
Run Code Online (Sandbox Code Playgroud)

需要注意的是,尽管在上面的示例中使用了 Mersenne Twister 的纯实现,但我们无法逃避 IO。这是因为非确定性调度,这意味着我们永远不知道哪个工作人员将处理数组的哪个块,因此哪个生成器将用于数组的哪个部分。从好的方面来说,如果生成器是纯的且可拆分的,例如splitmix,那么我们可以使用纯的、确定性的和可并行化的生成函数:randomArray,但这已经是一个单独的故事了。


mca*_*dre 4

由于 PRNG 固有的顺序性质,这样做可能不是一个好主意。相反,您可能希望按如下方式转换代码:

  1. 声明一个 IO 函数(main,或者你有的东西)。
  2. 根据需要读取任意数量的随机数。
  3. 将(现在是纯的)数字传递到您的 repa 函数中。