我正在规划一个订阅者和发布者系统,我计划在其中使用频道。我想记录我使用的每个通道中的元素数量,以便我可以根据系统中的瓶颈调整发布者/订阅者的数量。
我开始将Channel,ChannelReader和包装ChannelWriter到我自己的类中来计算写入和读取的数量,但这感觉就像一个黑客。有没有更好的办法?
我有一个 asp.net core 应用程序“A”,它每 1 分钟在一个文件夹中生成文件。
应用程序“B”想要一个通知或文件详细信息我们生成的文件以及该文件的一些哈希信息。根据此通知,应用程序“B”想要处理这些文件。
我正在考虑一些发布/订阅机制,我想要非常轻量级的组件,其中应用程序“A”将发布文件相关信息,应用程序“B”将订阅和侦听。
“system.threading.channels”会解决这个问题吗?
我有以下代码:
var channel = Channel.CreateUnbounded<string>();
var consumers = Enumerable
.Range(1, 5)
.Select(consumerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out var item))
{
Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
}
}
}));
var producers = Enumerable
.Range(1, 5)
.Select(producerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
var t = $"Message {i}";
Console.WriteLine($"Producing {t} on producer {producerNumber}");
await channel.Writer.WriteAsync(t);
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3))); …Run Code Online (Sandbox Code Playgroud) 我最近对我的框架进行了基准测试,发现它分配了大量垃圾。
我正在使用 andChannel<T>或TryRead操作ReadAsync在每次调用时分配内存。所以我用 a 交换了它,BlockingCollection<T>它也在 期间分配内存TryTake。
我使用了一个带有单个写入器/读取器的无界通道。还有一个正常的BlockingCollection<T>。
// Each thread runs this, jobmeta is justa struct
while (!token.IsCancellationRequested)
{
var jobMeta = await Reader.ReadAsync(token); // <- allocs here
jobMeta.Job.Execute();
jobMeta.JobHandle.Notify();
}
Run Code Online (Sandbox Code Playgroud)
探查器告诉我,所有分配都是由该ChannelReader.ReadAsync方法引起的。不幸的是,我无法显示完整的代码,但是由于我在热路径中使用它们,所以我需要不惜一切代价避免分配。
是否有任何替代方案在读/写/获取期间不分配内存并且行为相同(生产者/消费者多线程的并发类)?我怎样才能自己实现一个?
c# multithreading asynchronous producer-consumer system.threading.channels
我正在开发一个 .net core 3.0 web 应用程序,并决定在单例服务中使用 System.Threading.Channels。我的范围请求服务的顶层注入这个单例来访问它的通道。
我决定使用这种模式将请求(为其他连接的客户端生成实时更新)与这些更新的执行分离。
在一个对象中实现一个通道有很多例子。
谁能告诉我是否可以/建议在我的单身人士中使用多个频道?
我还没有遇到创建多个通道并在创建单例时“启动”它们的任何问题。我只是还没有达到可以测试多个客户端请求点击单例上不同通道以查看它是否运行良好的地步。(或者根本没有?...)
我使用多个通道的主要动机是我希望单例根据通道中项目的类型做不同的事情。
public class MyChannelSingleton
{
public Channel<MyType> TypeOneChannel = Channel.CreateUnbounded<MyType>();
public Channel<MyOtherType> TypeTwoChannel = Channel.CreateUnbounded<MyOtherType>();
public MyChannelSingleton()
{
StartChannels();
}
private void StartChannels()
{
// discarded async tasks due to calling in ctor
_ = StartTypeOneChannel();
_ = StartTypeTwoChannel();
}
private async Task StartTypeOneChannel()
{
var reader = TypeOneChannel.Reader;
while (await reader.WaitToReadAsync())
{
if (reader.TryRead(out MyType item))
{
// item is sucessfully read from channel
}
}
}
private …Run Code Online (Sandbox Code Playgroud) 我正在使用一个第三方库,它充当发布-订阅消息代理的接口。经纪人是 Solace PubSub+。
对于订阅者,供应商库采用“通过回调推送消息”模式。
我正在围绕供应商库编写一个自己的包装器库,以便其他开发人员更容易使用(隐藏库与网络通信方式的所有内部结构等)。
同样,我认为将订户提要公开为 可能会有所帮助IAsyncEnumerable,并且我认为这可能是 的一个很好的用例System.Threading.Channels。我有两个担忧:
EnumerableBroker,还是我在某个地方陷入了异步陷阱?我意识到第一个问题可能比 SO 更适合 CodeReview,但由于该问题的答案也与第二个问题相关,因此将它们放在一起似乎是合适的。值得注意的是:我正在避免使用IObservable/Rx,因为我的目标是让我的界面比供应商的界面更基础,而不是要求其他开发人员和我自己学习 Rx!对于中间的通道,理解生产者和消费者进程如何独立也是微不足道的,而对于可观察的,我的第一个心理过程是“好吧,那么生产者和消费者仍然是独立的吗?乍一看,我必须现在就了解调度程序...天哪,我只使用一个怎么样await foreach?”
这是消费消息的最小模型,没有EnumerableBroker:
// mockup of third party class
private class Broker
{
// mockup of how the third party library pushes messages via callback
public void Subscribe(EventHandler<int> handler) => this.handler = handler;
//simulate the broker pushing messages. Not "real" code
public void Start()
{
Task.Run
(
() =>
{
for …Run Code Online (Sandbox Code Playgroud)