Mih*_*kov 5 c# multithreading thread-safety
我正在创建驻留在内存中的仅追加数据结构,并将序列化为字节数组的记录追加到该内存。我需要它是线程安全的并且非常快,所以我想出了以下到目前为止运行良好的代码(这是一个伪代码,实际版本更复杂,并且做了其他一些工作,但这只是为了理解这一点)
public sealed class MemoryList : IDisposable
{
private int nextOffset = 0;
private readonly MemoryMappedFile file;
private readonly MemoryMappedViewAccessor va;
public MemoryList(uint capacity)
{
// Some checks on capacity here
var mapName = Guid.NewGuid().ToString("N");
this.file = MemoryMappedFile.CreateNew(mapName, capacity);
this.va = file.CreateViewAccessor(0, capacity);
}
public void AppendMessage(byte[] messagePayload)
{
if (messagePayload == null)
throw new ArgumentNullException(nameof(messagePayload));
if (messagePayload.Length == 0)
throw new ArgumentOutOfRangeException(nameof(messagePayload));
if (TryReserveCapacity(messagePayload.Length, out var offsetToWriteTo))
{
this.va.Write(offsetToWriteTo, messagePayload.Length);
this.va.WriteArray(offsetToWriteTo + sizeof(int), messagePayload, 0, messagePayload.Length);
}
}
private bool TryReserveCapacity(int dataLength, out long reservedOffset)
{
// reserve enough room to store data + its size
var packetSize = sizeof(int) + dataLength;
reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;
if (this.nextOffset <= this.va.Capacity)
return true;
reservedOffset = -1;
return false;
}
public void Dispose()
{
file?.Dispose();
va?.Dispose();
}
}
Run Code Online (Sandbox Code Playgroud)
这非常快,并且效果很好。无论我多么努力,我都无法打破它。
因此,现在我需要的是每个附加消息的TryReserveCapacity方法,以输出每个消息的逻辑索引。所以对于拳头消息获取索引0,第二个-指数1等,这导致使用两个调用Interlocked一个用于offset和一个用于messageIndex那些明显不是线程安全的,我可以用竞争条件导致以下情况结束。
MI:101,偏移:10000 MI:100,偏移:10500
关于如何保证没有一个MI会比另一个具有较大偏移量的MI大的想法?所有这些都没有使用任何锁?
因此,基本上,我们如何更改以下方法以使其正常运行?
private bool TryReserveCapacity(int dataLength, out long reservedOffset, out long messageId)
{
// reserve enough room to store data + its size
var packetSize = sizeof(int) + dataLength;
reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;
messageId = Interlocked.Increment(ref this.currentMessageId);
if (this.nextOffset <= this.va.Capacity)
return true;
reservedOffset = -1;
return false;
}
Run Code Online (Sandbox Code Playgroud)
PS我知道示例代码的字节序问题,但是正如我所说的,只是将其视为说明问题的伪代码。
抱歉,如果这没有直接解决您的主要问题(非锁定原子性),但我发现您正在使用 和MemoryMappedFile类来操作内存映射文件MemoryMappedViewAccessor。
我真的不知道 .NET Framework 的当前迭代是否已经解决了这个问题,但是在我们大约三年前编写的代码库中,我们发现使用这些类进行内存映射文件操作的性能非常差(如果速度慢大约 7 倍)我没记错),与使用 Win32 API 和映射内存的直接指针操作相比,即使在托管C++/CLI 类中也是如此。
我强烈建议您测试一下这种方法,您可能会对性能提升感到惊讶(正如我们确实所做的那样),并且性能提升可能非常显着,以至于您可以负担标准锁定的成本来实现您想要的原子性。
如果您想探索这一途径,这里有一个代码片段,展示了该技术的基础知识。
Int32 StationHashStorage::Open() {
msclr::lock lock(_syncRoot);
if( _isOpen )
return 0;
String^ fileName = GetFullFileName();
_szInBytes = ComputeFileSizeInBytes(fileName);
String^ mapExtension = GetFileExtension();
String^ mapName = String::Format("{0}{1}_{2}", _stationId, _date.ToString("yyyyMMdd"), mapExtension);
marshal_context context;
LPCTSTR pMapName = context.marshal_as<const TCHAR*>(mapName);
{
msclr::lock lock( _openLock );
// Try to see if another storage instance has requested the same memory-mapped file and share it
_hMapping = OpenFileMapping(FILE_MAP_READ | FILE_MAP_WRITE, FALSE, pMapName);
if( !_hMapping ) {
// This is the first instance acquiring the file
LPCTSTR pFileName = context.marshal_as<const TCHAR*>(fileName);
// Try to open the existing file, or create new one if not exists
_hFile = CreateFile(pFileName,
GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_READ,
NULL,
OPEN_ALWAYS,
FILE_ATTRIBUTE_NORMAL,
NULL);
if( !_hFile )
throw gcnew IOException(String::Format(Strings::CreateFileFailed, GetLastError(), _stationId));
_hMapping = CreateFileMapping(_hFile,
NULL,
PAGE_READWRITE | SEC_COMMIT,
0,
_szInBytes,
pMapName);
if( !_hMapping )
throw gcnew IOException(String::Format(Strings::CreateMappingFailed, GetLastError(), _stationId));
_usingSharedFile = false;
} else {
_usingSharedFile = true;
}
}
// _pData gives you access to the entire requested memory range, you can directly
// dereference it, memcopy it, etc.
_pData = (UInt32*)::MapViewOfFile(_hMapping, FILE_MAP_READ | FILE_MAP_WRITE, 0, 0, 0);
if( !_pData )
throw gcnew IOException(String::Format(Strings::MapViewOfFileFailed, ::GetLastError(), _stationId));
// warm-up the view by touching every page
Int32 dummy = 0;
for( int i = 0; i < _szInBytes / sizeof(Int32); i+= 1024 ) {
dummy ^= _pData[i];
}
// return the dummy value to prevent the optimizer from removing the apparently useless loop
_isOpen = true;
return dummy;
}
void StationHashStorage::Cleanup() {
if( !_disposed ) {
// dispose unmanaged resources here
if( _pData ) {
if( !UnmapViewOfFile(_pData) )
LOG_ERROR(Strings::UnmapViewOfFileFailed, ::GetLastError(), _stationId);
_pData = NULL;
}
if( _hMapping ) {
if( !CloseHandle(_hMapping) )
LOG_ERROR(Strings::CloseMappingFailed, ::GetLastError(), _stationId);
_hMapping = NULL;
}
if( _hFile ) {
if( !CloseHandle(_hFile) )
LOG_ERROR(Strings::CloseFileFailed, ::GetLastError(), _stationId);
_hFile = NULL;
}
_disposed = true;
}
}
Run Code Online (Sandbox Code Playgroud)
现在,关于你真正的问题。是否可以将生成的 ID 作为数据流的一部分嵌入?我的想法是这样的:
用一个虚拟的已知值(可能是 0xffffffff)预先写入内存的全部内容。
使用您当前的容量检查原子逻辑。
写入消息有效负载后,您立即写入计算出的消息 ID(您的容量检查需要考虑到此额外数据)
您无需使用 Interlocked.Add 来获取下一个 Id,而是进入一个循环,检查当前消息(前一个消息 Id)之前的内存,直到它与您的虚拟已知值不同。退出循环后,当前消息 Id 将为读取值 + 1。
这需要对第一条插入的消息进行一些特殊操作(因为它需要在流中播种第一个 Id 标记。您还需要小心(如果您使用长 Id 并且处于 32 位模式),您的Id 流的读取和写入是原子的。
祝你好运,我真的鼓励你尝试一下 Win32 API,如果希望事情有所改善,那将是非常有趣的!如果您需要 C++/CLI 代码方面的帮助,请随时与我联系。