如何在TVar上添加终结器

Tho*_*son 5 concurrency haskell transactional-memory

背景

在回答一个问题时,我构建并上传了一个有界tchan(我不适合上传jnb的版本).如果名称不够,则有界tchan(BTChan)是具有最大容量的STM信道(如果信道处于容量则写入块).

最近,我收到了添加像常规TChan一样的重复功能的请求.从而开始出现问题.

BTChan看起来如何

下面是BTChan的简化(实际上是非功能性)视图.

data BTChan a = BTChan
    { max :: Int
    , count :: TVar Int
    , channel :: TVar [(Int, a)]
    , nrDups  :: TVar Int
    }
Run Code Online (Sandbox Code Playgroud)

每次你写入频道时,你都会nrDups在元组中包含dups()的数量- 这是一个'单个元素计数器',它表示有多少读者获得了这个元素.

每个读者都会减少它读取的元素的计数器,然后将它的读指针移动到列表中的下一个元素.如果读取器将计数器递减到零,count则递减值以正确反映通道上的可用容量.

要明确所需的语义:通道容量表示通道中排队的最大元素数.任何给定元素排队,直到每个dup的读者都收到该元素.没有元素应该保留为GCed dup排队(这是主要问题).

例如,让三个容量为2的通道(c1,c2,c3)重复,其中2个项目写入通道,然后所有项目都从c1和读出c2.频道仍然已满(0剩余容量),因为c3尚未使用其副本.在任何时间点,如果所有引用c3都被删除(因此c3是GCed),那么应该释放容量(在这种情况下恢复为2).

这是问题所在: 假设我有以下代码

c <- newBTChan 1
_ <- dupBTChan c  -- This represents what would probably be a pathological bug or terminated reader
writeBTChan c "hello"
_ <- readBTChan c
Run Code Online (Sandbox Code Playgroud)

导致BTChan看起来像:

BTChan 1 (TVar 0) (TVar []) (TVar 1)             -->   -- newBTChan
BTChan 1 (TVar 0) (TVar []) (TVar 2)             -->   -- dupBTChan
BTChan 1 (TVar 1) (TVar [(2, "hello")]) (TVar 2) -->   -- readBTChan c
BTChan 1 (TVar 1) (TVar [(1, "hello")]) (TVar 2)       -- OH NO!
Run Code Online (Sandbox Code Playgroud)

注意到最后读数"hello"仍然是1?这意味着消息不被认为已经消失(即使它将在实际实现中获得GC)并且我们count永远不会减少.由于通道处于容量(最多1个元素),因此编写器将始终阻塞.

我希望每次dupBTChan调用时都会创建一个终结器.当收集重复(或原始)通道时,在该通道上剩余要读取的所有元素将使每个元素计数递减,同样该nrDups变量将递减.因此,将来的写入将具有正确的count(count不会为GCed通道未读取的变量保留空间).

解决方案1 ​​ - 手动资源管理(我想避免的)

由于这个原因,JNB的有界tchan实际上有手动资源管理.见cancelBTChan.我正在寻找一些更难以让用户出错的事情(并非在许多情况下手动管理不是正确的方法).

解决方案2 - 通过在TVars上阻止来使用例外(GHC无法按我的意愿执行此操作)

编辑这个解决方案,解决方案3只是一个分拆,不起作用!由于bug 5055(WONTFIX),GHC编译器向两个被阻塞的线程发送异常,即使一个就足够了(理论上可以确定,但GHC GC不实用).

如果获得a的所有方法BTChan都是IO,那么我们可以forkIO在一个特定的额外(虚拟)TVar字段上读取/重试的线程BTChan.当删除对TVar的所有其他引用时,新线程将捕获异常,因此它将知道何时减少nrDups和单个元素计数器.这应该工作,但强制我的所有用户使用IO来获取他们的BTChans:

data BTChan = BTChan { ... as before ..., dummyTV :: TVar () }

dupBTChan :: BTChan a -> IO (BTChan a)
dupBTChan c = do
       ... as before ...
       d <- newTVarIO ()
       let chan = BTChan ... d
       forkIO $ watchChan chan
       return chan

watchBTChan :: BTChan a -> IO ()
watchBTChan b = do
    catch (atomically (readTVar (dummyTV b) >> retry)) $ \e -> do
    case fromException e of
        BlockedIndefinitelyOnSTM -> atomically $ do -- the BTChan must have gotten collected
            ls <- readTVar (channel b)
            writeTVar (channel b) (map (\(a,b) -> (a-1,b)) ls)
            readTVar (nrDup b) >>= writeTVar (nrDup b) . (-1)
        _ -> watchBTChan b
Run Code Online (Sandbox Code Playgroud)

编辑:是的,这是一个糟糕的勒芒终结者,我没有任何特别的理由避免使用addFinalizer.这将是同样的解决方案,仍然迫使使用IO afaict.

解决方案3:比解决方案2更清洁的API,但GHC仍然不支持它

用户通过调用启动管理器线程initBTChanCollector,该线程将监视一组这些虚拟TVar(来自解决方案2)并进行所需的清理.基本上,它将IO推入另一个线程,该线程知道通过全局(unsafePerformIOed)做什么TVar.事情基本上像解决方案2,但BTChan的创建仍然可以是STM.无法运行initBTChanCollector会导致过程运行时任务列表(空间泄漏)不断增加.

解决方案4:永远不要丢弃BTChans

这类似于忽略了这个问题.如果用户从未删除重复,BTChan则问题将消失.

解决方案5 我看到了ezyang的答案(完全有效和赞赏),但我真的希望保持当前的API只有'dup'功能.

**解决方案6**请告诉我有更好的选择.

编辑:我实施了解决方案3(完全未经测试的alpha版本)并通过使全局本身成为一个潜在的空间泄漏BTChan- 陈可能应该具有1的容量,所以忘记运行init显示非常快,但这是一个小的改变.这适用于GHCi(7.0.3),但这似乎是偶然的.GHC抛出两个被阻塞线程的异常(读取BTChan的有效线程和观察线程)所以如果你在另一个线程丢弃它的引用然后你死的时候被阻止读取BTChan.

Edw*_*ang 5

这是另一种解决方案:要求对有界通道复制的所有访问都被一个在退出时释放其资源的函数(由异常或通常)括起来.您可以使用具有rank-2 runner的monad来防止重复的通道泄漏.它仍然是手动的,但类型系统使得做顽皮的事情变得更加困难.

你真的不想依赖真正的IO终结器,因为GHC不能保证何时可以运行终结器:你知道它可能要等到程序结束才能运行终结器,这意味着你已经陷入僵局直到那时.

  • 经过一段时间的考虑,我同意了.尝试将终结器用于除内存管理以外的任何其他内容是完全错误的 - 在Haskell中没有语义上"可观察"的效果.RAII的惯用Haskell等价物不是终结符,而是`with`函数. (2认同)