具有部分原子性的STM用于某些TVars

dfl*_*str 7 synchronization haskell transactions ghc stm

我正在使用STM做事,除此之外还使用了TBQueue数据结构并取得了巨大的成功.我一直在使用它的一个有用功能包括根据a中的前提条件从中读取TVar,基本上是这样的:

shouldRead <- readTVar shouldReadVar
if shouldRead
  then do
    a <- readTBQueue queue
    doSomethingWith a
  else doSomethingElse
Run Code Online (Sandbox Code Playgroud)

如果我们假设在执行此块之前它queue是空的并且shouldReadVar包含True它,那么它将导致readTBQueue调用retry,并且当shouldReadVar包含Falsequeue包含元素时,块将被重新执行,无论先发生什么.


我现在需要一个同步通道数据结构,类似于本文中描述的结构(如果你想理解这个问题,请阅读它),除非它需要在前一个例子中的前置条件下可读,也可能与其他东西一起构成.

让我们SyncChan用它来定义这个数据结构writeSyncChanreadSyncChan定义操作.

这是一个可能的用例:这个(伪)代码(因为我混合了STM/IO概念,所以不会起作用):

shouldRead <- readTVar shouldReadVar
if shouldRead
  then do
    a <- readSyncChan syncChan
    doSomethingWith a
  else doSomethingElse
Run Code Online (Sandbox Code Playgroud)

假设当前没有其他线程在writeSyncChan调用上阻塞,并且shouldReadChan包含True,我希望块" retry"直到shouldReadChan包含False,或者在a上包含不同的线程块writeSyncChan.换句话说:当一个线程retry打开writeSyncChan而另一个线程块到达a readSyncChan,反之亦然时,我希望沿着通道传输该值.在所有其他情况下,双方都应处于某种retry状态,从而对变化做出反应shouldReadVar,以便取消读取或写入.

上面使用两个(T)MVars 链接的文章中描述的天真方法当然是不可能的.由于数据结构是同步的,因此无法在两个atomically块中使用它,因为您无法在原子上下文中更改一个TMVar并等待另一个块更改TMVar.

相反,我正在寻找一种部分原子性,我可以"提交"某个事务的某个部分,只在某些变量发生变化时回滚,而不是其他变量.如果我有"msg"和"ack"变量,就像上面文章中的第一个例子,我希望能够写入"msg"变量,然后等待值到达"ack",或者等待我的要更改的其他事务变量.如果其他事务变量发生更改,则应重试整个原子块,如果"ack"值到达,则事务应继续处于先前状态.对于阅读方面,应该发生类似的事情,除了我当然要从"msg"读取并写入"ack".

使用GHC STM可以做到这一点,还是需要进行手动MVar /回滚处理?

Gab*_*lez 3

这就是你想要的:

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad

data SyncChan a = SyncChan (TMVar a) (TMVar ())

newSyncChan :: IO (SyncChan a)
newSyncChan = do
    msg <- newEmptyTMVarIO
    ack <- newEmptyTMVarIO
    return (SyncChan msg ack)

readIf :: SyncChan a -> TVar Bool -> STM (Maybe a)
readIf (SyncChan msg ack) shouldReadVar = do
    b <- readTVar shouldReadVar
    if b
        then do
            a <- takeTMVar msg
            putTMVar ack ()
            return (Just a)
        else return Nothing

write :: SyncChan a -> a -> IO ()
write (SyncChan msg ack) a = do
    atomically $ putTMVar msg a
    atomically $ takeTMVar ack

main = do
    sc <- newSyncChan
    tv <- newTVarIO True
    forkIO $ forever $ forM_ [False, True] $ \b -> do
        threadDelay 2000000
        atomically $ writeTVar tv b
    forkIO $ forM_ [0..] $ \i -> do
        putStrLn "Writing..."
        write sc i
        putStrLn "Write Complete"
        threadDelay 300000
    forever $ do
        putStrLn "Reading..."
        a <- atomically $ readIf sc tv
        print a
        putStrLn "Read Complete"
Run Code Online (Sandbox Code Playgroud)

这给出了您想要的行为。同时输入TVarTrue和输出端将相互同步。当TVar切换到False时,读取端自由中止并返回Nothing