Zuh*_*aib 14 c# multithreading sql-server-2008 parallel.foreach sqlfilestream
我正在提取SQL文件表中的文件的内容.如果我不使用Parallel,以下代码可以正常工作.
我同时读取sql文件流(并行)时遇到以下异常.
该进程无法访问指定的文件,因为它已在另一个事务中打开.
在Parallel.ForEach中从FileTable读取文件(使用GET_FILESTREAM_TRANSACTION_CONTEXT)时,我得到了上述异常.
https://gist.github.com/NerdPad/6d9b399f2f5f5e5c6519
获取附件,并提取内容:
var documents = new List<ExtractedContent>();
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
var attachments = await dao.GetAttachmentsAsync();
// Extract the content simultaneously
// documents = attachments.ToDbDocuments().ToList(); // This works
Parallel.ForEach(attachments, a => documents.Add(a.ToDbDocument())); // this doesn't
ts.Complete();
}
Run Code Online (Sandbox Code Playgroud)
DAO读取文件表:
public async Task<IEnumerable<SearchAttachment>> GetAttachmentsAsync()
{
try
{
var commandStr = "....";
IEnumerable<SearchAttachment> attachments = null;
using (var connection = new SqlConnection(this.DatabaseContext.Database.Connection.ConnectionString))
using (var command = new SqlCommand(commandStr, connection))
{
connection.Open();
using (var reader = await command.ExecuteReaderAsync())
{
attachments = reader.ToSearchAttachments().ToList();
}
}
return attachments;
}
catch (System.Exception)
{
throw;
}
}
Run Code Online (Sandbox Code Playgroud)
为每个文件创建对象:该对象包含对GET_FILESTREAM_TRANSACTION_CONTEXT的引用
public static IEnumerable<SearchAttachment> ToSearchAttachments(this SqlDataReader reader)
{
if (!reader.HasRows)
{
yield break;
}
// Convert each row to SearchAttachment
while (reader.Read())
{
yield return new SearchAttachment
{
...
...
UNCPath = reader.To<string>(Constants.UNCPath),
ContentStream = reader.To<byte[]>(Constants.Stream) // GET_FILESTREAM_TRANSACTION_CONTEXT()
...
...
};
}
}
Run Code Online (Sandbox Code Playgroud)
使用SqlFileStream读取文件: 此处抛出异常
public static ExtractedContent ToDbDocument(this SearchAttachment attachment)
{
// Read the file
// Exception is thrown here
using (var stream = new SqlFileStream(attachment.UNCPath, attachment.ContentStream, FileAccess.Read, FileOptions.SequentialScan, 4096))
{
...
// extract content from the file
}
....
}
Run Code Online (Sandbox Code Playgroud)
根据这篇文章,它似乎可能是一个隔离级问题.有没有人遇到类似的问题?
交易不流入Parallel.ForEach,必须手动将交易带入。
//Switched to a thread safe collection.
var documents = new ConcurrentQueue<ExtractedContent>();
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
var attachments = await dao.GetAttachmentsAsync();
//Grab a reference to the current transaction.
var transaction = Transaction.Current;
Parallel.ForEach(attachments, a =>
{
//Spawn a dependant clone of the transaction
using (var depTs = transaction.DependentClone(DependentCloneOption.RollbackIfNotComplete))
{
documents.Enqueue(a.ToDbDocument());
depTs.Complete();
}
});
ts.Complete();
}
Run Code Online (Sandbox Code Playgroud)
我也从 切换List<ExtractedContent>到 ,ConcurrentQueue<ExtractedContent>因为不允许.Add(同时从多个线程调用列表。