我想await在BlockingCollection<T>.Take()异步的结果,所以我不阻止线程.寻找这样的事情:
var item = await blockingCollection.TakeAsync();
Run Code Online (Sandbox Code Playgroud)
我知道我可以这样做:
var item = await Task.Run(() => blockingCollection.Take());
Run Code Online (Sandbox Code Playgroud)
但是有点杀死了整个想法,因为另一个线程ThreadPool被阻止了.
还有其他选择吗?
如果Queue中没有项目,ConcurrentQueue中的TryDequeue将返回false.
如果队列是空的,我需要我的队列将等待,直到新项目被添加到队列中并且它将新队列出列,并且该过程将继续这样.
我应该在C#4.0中使用monitor.enter,wait,pulse或任何更好的选项
我一直在看到Cancellation.Register与结果中的using子句一起使用的代码CancellationTokenRegistration:
using (CancellationTokenRegistration ctr = token.Register(() => wc.CancelAsync()))
{
await wc.DownloadStringAsync(new Uri("http://www.hamster.com"));
}
Run Code Online (Sandbox Code Playgroud)
我得到你应该确保你Dispose的IDisposable,但为什么它甚至工具IDisposable?有什么资源需要发布?它认为平等的唯一方法.
如果你不这样做Dispose会怎么样?你泄漏了什么?
.net c# idisposable task-parallel-library cancellation-token
我正在使用生产者/消费者模式实现数据链路层.数据链路层有自己的线程和状态机,通过线路传输数据链路协议(以太网,RS-232 ......).物理层的接口表示为System.IO.Stream.另一个线程将消息写入数据链接对象并从中读取消息.
数据链接对象具有空闲状态,必须等待以下四种情况之一:
我很难找到最好的方法来实现这一点,而无需将通信分成读/写线程(从而大大增加了复杂性).以下是我如何获得4分中的3分:
// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);
Run Code Online (Sandbox Code Playgroud)
要么
BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);
Run Code Online (Sandbox Code Playgroud)
我该怎么做才能阻止线程,等待所有四个条件?欢迎所有建议.我没有设置任何架构或数据结构.
编辑:********感谢大家的帮助.这是我的解决方案********
首先,我认为没有生产者/消费者队列的异步实现.所以我实现了类似于这个stackoverflow帖子的东西.
我需要一个外部和内部取消源来分别停止使用者线程并取消中间任务,类似于本文.
byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
CancellationToken …Run Code Online (Sandbox Code Playgroud) 假设我有很多生产者,1个消费者未绑定 Channel,有一个消费者:
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
}
Run Code Online (Sandbox Code Playgroud)
问题在于该consume函数会进行一些 IO 访问,并且可能还会进行一些网络访问,因此在消耗 1 条消息之前可能会产生更多消息。但由于IO资源不能并发访问,所以我不能有很多消费者,也不能把函数扔到consume一个Task中然后忘记它。
该consume功能可以轻松修改以获取多条消息并批量处理它们。所以我的问题是,是否有一种方法可以让消费者在尝试访问通道队列时获取通道队列中的所有消息,如下所示:
while (true) {
Message[] messages = await channel.Reader.TakeAll();
await consumeAll(messages);
}
Run Code Online (Sandbox Code Playgroud)
编辑:我能想到的 1 个选项是:
List<Message> messages = new();
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
Message msg;
while (channel.Reader.TryRead(out msg))
messages.Add(msg);
if (messages.Count > 0)
{
await consumeAll(messages);
messages.Clear();
}
}
Run Code Online (Sandbox Code Playgroud)
但我觉得这应该是一个更好的方法来做到这一点。
我希望在同一台机器上有两个线程,一个用于填充集合,另一个用于在数据可用时立即弹出数据,并在知道结束时停止.我只是不知道要使用什么样的集合......
private void DataProviderThread()
{
GlobalCollection = new SomeMagicCollection();
for (int i = 0; i < 100; i++)
{
GlobalCollection.Add(new SomeDataItem(i));
Thread.Sleep(100);
}
GlobalCollection.IHaveFinishedPopulatingThanksAndBye();
}
private void DataCruncherThread()
{
foreach (var item in GlobalCollection)
{
// Do whatever
}
// The control should exit foreach only once the data provider states that the collection is finished
}
Run Code Online (Sandbox Code Playgroud)
然后我想简单地迭代它,让Collection照顾好
IHaveFinishedPopulatingThanksAndBye(),然后干净地退出循环我无法相信C#不会在新版本中发布此版本.但它的名字是什么?
两者Queue和ConcurrentQueue实施IEnumerable但不是IAsyncEnumerable。NuGet 上是否有可用的标准类或类来实现IAsyncEnumerable,如果队列为空,则在将MoveNextAsync下一个添加到队列之前,结果不会完成?
我有几种方法可以向数据库报告一些数据。我们希望异步调用对数据服务的所有调用。这些对数据服务的调用都结束了,因此我们要确保这些 DS 调用在任何给定时间按顺序依次执行。最初,我在这些方法中的每一个上使用 async await 并且每个调用都是异步执行的,但我们发现如果它们乱序,那么就有出错的空间。
所以,我认为我们应该将所有这些异步任务排队并在一个单独的线程中发送它们,但我想知道我们有哪些选择?我遇到了 'SemaphoreSlim' 。这在我的用例中是否合适?或者还有哪些其他选项适合我的用例?请指导我。
所以,我目前在我的代码中有什么
public static SemaphoreSlim mutex = new SemaphoreSlim(1);
//first DS call
public async Task SendModuleDataToDSAsync(Module parameters)
{
var tasks1 = new List<Task>();
var tasks2 = new List<Task>();
//await mutex.WaitAsync(); **//is this correct way to use SemaphoreSlim ?**
foreach (var setting in Module.param)
{
Task job1 = SaveModule(setting);
tasks1.Add(job1);
Task job2= SaveModule(GetAdvancedData(setting));
tasks2.Add(job2);
}
await Task.WhenAll(tasks1);
await Task.WhenAll(tasks2);
//mutex.Release(); // **is this correct?**
}
private async Task SaveModule(Module setting)
{
await Task.Run(() …Run Code Online (Sandbox Code Playgroud) c# ×8
asynchronous ×3
.net ×2
async-await ×2
collections ×2
queue ×2
channel ×1
idisposable ×1
semaphore ×1