在不使用锁的情况下以原子方式更改两个数字

Mih*_*kov 5 c# multithreading thread-safety

我正在创建驻留在内存中的仅追加数据结构,并将序列化为字节数组的记录追加到该内存。我需要它是线程安全的并且非常快,所以我想出了以下到目前为止运行良好的代码(这是一个伪代码,实际版本更复杂,并且做了其他一些工作,但这只是为了理解这一点)

public sealed class MemoryList : IDisposable
{
    private int nextOffset = 0;
    private readonly MemoryMappedFile file;
    private readonly MemoryMappedViewAccessor va;

    public MemoryList(uint capacity)
    {
        // Some checks on capacity here
        var mapName = Guid.NewGuid().ToString("N");
        this.file = MemoryMappedFile.CreateNew(mapName, capacity);
        this.va = file.CreateViewAccessor(0, capacity);
    }

    public void AppendMessage(byte[] messagePayload)
    {
        if (messagePayload == null) 
            throw new ArgumentNullException(nameof(messagePayload));
        if (messagePayload.Length == 0)
            throw new ArgumentOutOfRangeException(nameof(messagePayload));

        if (TryReserveCapacity(messagePayload.Length, out var offsetToWriteTo))
        {
            this.va.Write(offsetToWriteTo, messagePayload.Length);
            this.va.WriteArray(offsetToWriteTo + sizeof(int), messagePayload, 0, messagePayload.Length);
        }
    }

    private bool TryReserveCapacity(int dataLength, out long reservedOffset)
    {
        // reserve enough room to store data + its size
        var packetSize = sizeof(int) + dataLength;
        reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;

        if (this.nextOffset <= this.va.Capacity)
            return true;
        reservedOffset = -1;
        return false;
    }

    public void Dispose()
    {
        file?.Dispose();
        va?.Dispose();
    }
}
Run Code Online (Sandbox Code Playgroud)

这非常快,并且效果很好。无论我多么努力,我都无法打破它。

因此,现在我需要的是每个附加消息的TryReserveCapacity方法,以输出每个消息的逻辑索引。所以对于拳头消息获取索引0,第二个-指数1等,这导致使用两个调用Interlocked一个用于offset和一个用于messageIndex那些明显不是线程安全的,我可以用竞争条件导致以下情况结束。

MI:101,偏移:10000 MI:100,偏移:10500

关于如何保证没有一个MI会比另一个具有较大偏移量的MI大的想法?所有这些都没有使用任何锁?

因此,基本上,我们如何更改以下方法以使其正常运行?

private bool TryReserveCapacity(int dataLength, out long reservedOffset, out long messageId)
{
    // reserve enough room to store data + its size
    var packetSize = sizeof(int) + dataLength;
    reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;
    messageId = Interlocked.Increment(ref this.currentMessageId);

    if (this.nextOffset <= this.va.Capacity)
        return true;
    reservedOffset = -1;
    return false;
}
Run Code Online (Sandbox Code Playgroud)

PS我知道示例代码的字节序问题,但是正如我所说的,只是将其视为说明问题的伪代码。

Blu*_*rat 1

抱歉,如果这没有直接解决您的主要问题(非锁定原子性),但我发现您正在使用 和MemoryMappedFile类来操作内存映射文件MemoryMappedViewAccessor

我真的不知道 .NET Framework 的当前迭代是否已经解决了这个问题,但是在我们大约三年前编写的代码库中,我们发现使用这些类进行内存映射文件操作的性能非常差如果速度慢大约 7 倍)我没记错),与使用 Win32 API 和映射内存的直接指针操作相比,即使在托管C++/CLI 类中也是如此。

我强烈建议您测试一下这种方法,您可能会对性能提升感到惊讶(正如我们确实所做的那样),并且性能提升可能非常显着,以至于您可以负担标准锁定的成本来实现您想要的原子性。

如果您想探索这一途径,这里有一个代码片段,展示了该技术的基础知识。

Int32 StationHashStorage::Open() {
   msclr::lock lock(_syncRoot);
   if( _isOpen )
      return 0;
   String^ fileName = GetFullFileName();

   _szInBytes = ComputeFileSizeInBytes(fileName);
   String^ mapExtension = GetFileExtension();
   String^ mapName = String::Format("{0}{1}_{2}", _stationId, _date.ToString("yyyyMMdd"), mapExtension);

   marshal_context context;
   LPCTSTR pMapName = context.marshal_as<const TCHAR*>(mapName);

   {
      msclr::lock lock( _openLock );
         // Try to see if another storage instance has requested the same memory-mapped file and share it
         _hMapping = OpenFileMapping(FILE_MAP_READ | FILE_MAP_WRITE, FALSE, pMapName);
         if( !_hMapping ) {
            // This is the first instance acquiring the file
            LPCTSTR pFileName = context.marshal_as<const TCHAR*>(fileName);
            // Try to open the existing file, or create new one if not exists
            _hFile = CreateFile(pFileName, 
                                GENERIC_READ | GENERIC_WRITE, 
                                FILE_SHARE_READ,
                                NULL,
                                OPEN_ALWAYS,
                                FILE_ATTRIBUTE_NORMAL,
                                NULL);
            if( !_hFile )
               throw gcnew IOException(String::Format(Strings::CreateFileFailed, GetLastError(), _stationId));
            _hMapping = CreateFileMapping(_hFile, 
                                          NULL,
                                          PAGE_READWRITE | SEC_COMMIT,
                                          0,
                                          _szInBytes,
                                          pMapName);
            if( !_hMapping ) 
               throw gcnew IOException(String::Format(Strings::CreateMappingFailed, GetLastError(), _stationId));
            _usingSharedFile = false;
         } else {
            _usingSharedFile = true;
         }
      }

// _pData gives you access to the entire requested memory range, you can directly
// dereference it,  memcopy it, etc.

   _pData = (UInt32*)::MapViewOfFile(_hMapping, FILE_MAP_READ | FILE_MAP_WRITE, 0, 0, 0);

   if( !_pData ) 
      throw gcnew IOException(String::Format(Strings::MapViewOfFileFailed, ::GetLastError(), _stationId));

   // warm-up the view by touching every page
   Int32 dummy = 0;
   for( int i = 0; i < _szInBytes / sizeof(Int32); i+= 1024 ) {
      dummy ^=  _pData[i];
   }
   // return the dummy value to prevent the optimizer from removing the apparently useless loop
   _isOpen = true;
   return dummy;
}

void StationHashStorage::Cleanup() {
     if( !_disposed ) {
      // dispose unmanaged resources here
      if( _pData ) {
         if( !UnmapViewOfFile(_pData) ) 
            LOG_ERROR(Strings::UnmapViewOfFileFailed, ::GetLastError(), _stationId);
         _pData = NULL;
      }

      if( _hMapping ) {
         if( !CloseHandle(_hMapping) ) 
            LOG_ERROR(Strings::CloseMappingFailed, ::GetLastError(), _stationId);
         _hMapping = NULL;
      }


      if( _hFile ) {
         if( !CloseHandle(_hFile) ) 
            LOG_ERROR(Strings::CloseFileFailed, ::GetLastError(), _stationId);
         _hFile = NULL;
      }
      _disposed = true;
   }
}
Run Code Online (Sandbox Code Playgroud)

现在,关于你真正的问题。是否可以将生成的 ID 作为数据流的一部分嵌入?我的想法是这样的:

  1. 用一个虚拟的已知值(可能是 0xffffffff)预先写入内存的全部内容。

  2. 使用您当前的容量检查原子逻辑。

  3. 写入消息有效负载后,您立即写入计算出的消息 ID(您的容量检查需要考虑到此额外数据)

  4. 您无需使用 Interlocked.Add 来获取下一个 Id,而是进入一个循环,检查当前消息(前一个消息 Id)之前的内存,直到它与您的虚拟已知值不同。退出循环后,当前消息 Id 将为读取值 + 1。

这需要对第一条插入的消息进行一些特殊操作(因为它需要在流中播种第一个 Id 标记。您还需要小心(如果您使用长 Id 并且处于 32 位模式),您的Id 流的读取和写入是原子的。

祝你好运,我真的鼓励你尝试一下 Win32 API,如果希望事情有所改善,那将是非常有趣的!如果您需要 C++/CLI 代码方面的帮助,请随时与我联系。