我正在使用带有> 的生产者/消费者模式System.Collection.Concurrent.BlockingCollection<DataTable来从数据库(生产者)检索数据并在数据(消费者)上创建Lucene索引.
Producer一次抓取10000条记录并将该组添加到BlockingCollection<DataTable>.然后消费者(稍微慢一点)抓住那些10000并创建一个索引.
阻塞集合被限制为<DataTable>每个10000行中的5个.
起初程序运行得很好,但是在它获得大约150000行后,我注意到我的计算机内存最大化并且它慢慢爬行.
似乎BlockingCollection无法null在获取项目后将基础数组槽设置为.
码:
private static LuceneIndex index;
private static BlockingCollection<DataTable> blockingCol;
private static void Producer()
{
while (true)
{
//...get next 10000 rows
DataTable data = GetNextSet();
if(data.Row.Count > 0)
blockingCol.Add(products);
else
break;
}
}
private static void Consumer()
{
while (!BlockingCol.IsCompleted || BlockingCol.Count > 0)
{
DataTable data = blockingCol.Take();
index.UpdateIndex(GetLuceneDocs(data));
}
}
public static void Main(System.String[] args)
{
index = new LuceneIndex();
blockingCol …Run Code Online (Sandbox Code Playgroud) 我使用的BlockingCollection{T}是仅由一个线程填充且仅由一个线程消耗的线程。生产和消费物品运行良好。问题出在这个操作的最后。任务在 处阻塞(如预期)GetConsumingEnumerable。CompleteAdding调用任务后将处理BlockingCollection并完成,没有任何异常。到目前为止,一切都很好。
现在我有一个线程将项目添加到BlockingCollection. 该线程必须进行测试IsAddingCompleted,然后必须添加该项目。IsAddingCompleted但请求和添加项目之间存在竞争条件。有一个TryAdd- 方法,但如果添加已经完成,也会引发异常。
如何在没有额外锁定的情况下添加已完成的项目或测试?为什么会TryAdd抛出异常?如果添加已经完成,返回false即可。
非常简化的代码如下所示:
private BlockingCollection<string> _items = new BlockingCollection<string>();
public void Start()
{
Task.Factory.StartNew(
() =>
{
foreach (var item in this._items.GetConsumingEnumerable())
{
}
this._items.Dispose();
});
Thread.Sleep(50); // Wait for Task
this._items.CompleteAdding(); // Complete adding
}
public void ConsumeItem(string item)
{
if (!this._items.IsAddingCompleted)
{
this._items.Add(item);
}
}
Run Code Online (Sandbox Code Playgroud)
是的,我知道这段代码没有意义,因为几乎没有机会添加任何项目,并且 foreach 循环没有注意到。消耗任务对我的问题来说并不重要。
问题如ConsumeItem-method 所示。我可以添加一个额外的锁(信号量)ConsumeItem和 …
我有点困惑BlockingConnection和AsyncoreConnection.我想从Django应用程序向RabbitMQ队列发送一些消息.使用全局BlockingConnection对象可以吗?
谢谢.
我将后台任务添加到阻止集合(在后台添加).
我在GetConsumingEnumerable返回的Enumerable上等待Task.WhenAll.
我的问题是:Task.WhenAll的重载是否接收到IEnumerable"准备好"可能会收到无穷无尽的任务?
我只是不确定我是否可以这样做,或者它是否意味着以这种方式使用?
private async Task RunAsync(TimeSpan delay, CancellationToken cancellationToken)
{
using (BlockingCollection<Task> jobcollection = new BlockingCollection<Task>())
{
Task addingTask = Task.Run(async () =>
{
while (true)
{
DateTime utcNow = DateTime.UtcNow;
var jobs = Repository.GetAllJobs();
foreach (var job in GetRootJobsDue(jobs, utcNow))
{
jobcollection.Add(Task.Run(() => RunJob(job, jobs, cancellationToken, utcNow), cancellationToken), cancellationToken);
}
await Task.Delay(delay, cancellationToken);
}
}, cancellationToken);
await Task.WhenAll(jobcollection.GetConsumingEnumerable(cancellationToken));
}
}
Run Code Online (Sandbox Code Playgroud) 我的程序有3个功能.每个函数都会获取一个项目列表并填充某些信息.例如
class Item {
String sku,upc,competitorName;
double price;
}
Run Code Online (Sandbox Code Playgroud)
函数F1采用List并填充upc
功能F2取List(F1的输出)并填写价格.
功能F3取List(输出F2)并填充competitorName
F1可以一次处理5个项目, F2可以一次处理20个项目,F3也可以处理20个项目 .
现在我正在连续运行F1 - > F2 - > F3,因为F2需要来自F1的信息(UPC代码).F3需要F2的价格.
我想通过连续运行F1运行而不是等待F2和F3完成来使这个过程高效.F1执行并输出到队列中,然后F2一次取20个项目并处理它们.然后跟随F3.
如何通过使用BlockingCollection和Queue来实现这一目标?
我有BlockingCollection重复的元素.如何修改它以添加或获取不同的元素?
我订阅了一项服务,该服务将在收到新元素时引发Event,然后将此元素添加到中BlockingCollection。
我正在运行第二个线程,该线程将循环运行BlockingCollection以添加/更新可观察集合中的元素。
问题是您如何添加ObservableCollection?我知道我不能只.add对这种类型的集合执行操作,因为它需要在UI线程上完成。所以我尝试使用不同的ObservableCollection子类,该子类使用分派器来编组元素的添加,但是每一次,我都会遇到相同的错误
“未知模块中发生了'System.StackOverflowException类型的未处理的异常。”
故障排除提示如下:
确保您没有无限循环或无限递归。
好吧,实际上,我确实有某种无限循环,因为一直在BlockingQueue接收新元素,例如每秒5到10。
如果我不将控件绑定到observablecollectionough,或者使用List代替,则不会出现异常。
Class ElementHolder
{
private ExternalService _externalService;
private ObservableCollection<Element> _elementsList = new ObservableCollection<Element>();
private BlockingCollection<Element> _elementsReceived = new BlockingCollection<Element>();
public ObservableCollection<Element> ElementsList
{
get
{
return _elementList;
}
set
{
_elementList = value;
}
}
public ElementHolder()
{
Task.Factory.StartNew(ReadElements);
_externalService = new ExternalService();
_externalService.ReceivedNewElement += new Action<Element>(o => _elementsReceived.Add(o));
_externalService.Subscribe();
}
private void ReadElements()
{
foreach (Element element in …Run Code Online (Sandbox Code Playgroud) 有没有办法批量阻止集合中的项集合.例如
我有一个消息传递总线发布者调用blockingCollection.Add()
还有一个像这样创建的消费线程:
Task.Factory.StartNew(() =>
{
foreach (string value in blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine(value);
}
});
Run Code Online (Sandbox Code Playgroud)
但是,我只希望控制台在阻塞集合上有10个项目后写入,而GetConsumingEnumerable()总是在每个项目添加后触发.我可以为此编写自己的队列但是如果可能的话我想使用阻塞集合吗?
我已经编写了一个复制该问题的示例测试。这不是我的实际代码,我尝试编写一个小代码。如果将边界容量增加到迭代次数,从而有效地使其没有边界,则不会出现死锁,如果将最大并行度设置为较小的数字(例如 1),则不会出现死锁。
再说一次,我知道下面的代码不是很好,但我实际发现的代码要大得多并且难以理解。基本上有一个与远程资源的连接的阻塞对象池,并且流中的几个块使用了该连接。
关于如何解决这个问题有什么想法吗?乍一看,这似乎是数据流的问题。当我中断查看线程时,我看到许多线程在 Add 上被阻塞,0 个线程在 take 上被阻塞。addBlocks 出站队列中有几个项目尚未传播到 takeblock,因此它被卡住或死锁。
var blockingCollection = new BlockingCollection<int>(10000);
var takeBlock = new ActionBlock<int>((i) =>
{
int j = blockingCollection.Take();
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 20,
SingleProducerConstrained = true
});
var addBlock = new TransformBlock<int, int>((i) =>
{
blockingCollection.Add(i);
return i;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 20
});
addBlock.LinkTo(takeBlock, new DataflowLinkOptions()
{
PropagateCompletion = true
});
for (int i = 0; i < 100000; i++)
{
addBlock.Post(i);
}
addBlock.Complete();
await addBlock.Completion; …Run Code Online (Sandbox Code Playgroud) c# multithreading task-parallel-library blockingcollection tpl-dataflow
我一直在试图让我的头部周围的阻塞收集和我碰到Take()和TryTake()也Add()和TryAdd()
我知道如果没有要取的项目,Take()将等到添加一个项目,类似地,Add()如果集合已达到其最大限制,它将等到项目被删除。
“如果集合大小有界,Add 和 TryAdd 可能会阻塞;当集合为空时,Take 和 TryTake 会阻塞。”
因此Take(),TryTake()两者都等待添加项目。那么,如果我们不提供任何超时或取消令牌,Take()和之间有什么区别TryTake(),不应该TryTake() return false直接而不是等待?和一样TryAdd()吗?
c# ×7
.net ×2
async-await ×1
asynchronous ×1
django ×1
memory-leaks ×1
pika ×1
python ×1
rabbitmq ×1
task ×1
tpl-dataflow ×1
wpf ×1