我的工作假设是,当与System.Collections.Concurrent集合(包括ConcurrentDictionary)一起使用时,LINQ是线程安全的.
(其他Overflow帖子似乎同意:链接)
但是,对LINQ OrderBy扩展方法的实现的检查表明,对于实现ICollection的并发集合的子集(例如ConcurrentDictionary),它似乎不是线程安全的.
所述OrderedEnumerable 的GetEnumerator(源此处)构造一个实例缓冲结构(源这里它试图收集强制转换为)的ICollection(其ConcurrentDictionary实现),然后用初始化为集合的大小的阵列执行collection.CopyTo.
因此,如果在OrderBy操作期间ConcurrentDictionary(在这种情况下为具体的ICollection)的大小增加,在初始化数组和复制到它之间,此操作将抛出.
以下测试代码显示此异常:
(注意:我很欣赏在一个线程安全的集合上执行一个OrderBy,这个集合在你下面发生变化并没有那么有意义,但我不相信它应该抛出)
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Program
{
class Program
{
static void Main(string[] args)
{
try
{
int loop = 0;
while (true) //Run many loops until exception thrown
{
Console.WriteLine($"Loop: {++loop}");
_DoConcurrentDictionaryWork().Wait();
}
}
catch …Run Code Online (Sandbox Code Playgroud) 例如以下代码线程安全:
ConcurrentQueue<Guid> _queue = new ConcurrentQueue<Guid>();
while(true)
{
for(int y = 0; y < 3; y++)
{
if(y % 3 == 0)
{
System.Threading.Tasks.Task.Run(() => _queue.Enqueue(Guid.NewGuid()));
}
else if (y % 3 == 1)
{
Guid x;
System.Threading.Tasks.Task.Run(() => _queue.TryDequeue(out x));
}
else if(y % 3 == 2)
{
System.Threading.Tasks.Task.Run(() =>
{
if (_queue.Any(t => t == testGuid))
{
// Do something
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
编辑:显然标题不够清晰,所以更新了代码示例以包含实际的多线程行为,是的,上面的代码只是多线程行为的一个示例.
我得到一个难以复制的错误在下面的程序中,更新平行,主线程并发字典中的线程数显示后固定的时间间隔字典排序顺序的状态,直到所有的更新线程完成.
public void Function(IEnumerable<ICharacterReader> characterReaders, IOutputter outputter)
{
ConcurrentDictionary<string, int> wordFrequencies = new ConcurrentDictionary<string, int>();
Thread t = new Thread(() => UpdateWordFrequencies(characterReaders, wordFrequencies));
bool completed = false;
var q = from pair in wordFrequencies orderby pair.Value descending, pair.Key select new Tuple<string, int>(pair.Key, pair.Value);
t.Start();
Thread.Sleep(0);
while (!completed)
{
completed = t.Join(1);
outputter.WriteBatch(q);
}
}
Run Code Online (Sandbox Code Playgroud)
该函数给出了字符流和输出器的列表.该函数维护从每个字符流中读取的字的字频的并发字典(并行).的文字,被一个新的线程读入,并且,直到所有的输入流已被读出的主线程输出词典的当前状态(以排序的顺序),每1毫秒(在实践中,输出将是什么每10秒等,但错误似乎只出现在非常小的值上.WriteBatch函数只是写入控制台:
public void WriteBatch(IEnumerable<Tuple<string, int>> batch)
{
foreach (var tuple in batch)
{
Console.WriteLine("{0} - {1}", tuple.Item1, tuple.Item2);
}
Console.WriteLine();
}
Run Code Online (Sandbox Code Playgroud)
大多数执行都很好,但有时我在WriteBatch函数的foreach语句中得到以下错误:
"未处理的异常:System.ArgumentException:索引等于或大于数组的长度,或者字典中的元素数量比索引到目标数组末尾的可用空间大."
如果主线程在启动更新线程之后和启动显示循环之前暂停一段时间,则错误确实会消失.如果删除了orderby子句并且未在linq查询中对字典进行排序,它似乎也会消失.有什么解释吗?
foreach (var …
c# concurrency multithreading race-condition concurrentdictionary