Я T*_*bur 0 c# asynchronous system.reactive async-await rx.net
我正在尝试在我的 Kafka 消费者中使用 Rx。
public static event EventHandler<ConsumeResult<string, string>> GenericEvent;
Run Code Online (Sandbox Code Playgroud)
然后我有以下代码
var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
x => GenericEvent += x,
x => GenericEvent -= x)
.Select(x => x.EventArgs);
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
GenericEvent(consumeResult.Topic, consumeResult);
}
Run Code Online (Sandbox Code Playgroud)
然后在某处我使用它
var subscription = observable.Subscribe(message =>
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");
//_kafkaConsumer.Commit(messages);
});
Run Code Online (Sandbox Code Playgroud)
是否有可能按主题名称 ( consumeResult.Topic)运行单独的线程?当消费者收到一条消息时,它应该将其按主题重定向到相应的线程
试一试:
Observable
.Interval(TimeSpan.FromSeconds(0.1))
.Take(20)
.GroupBy(x => x % 3)
.SelectMany(xs => Observable.Using(() => new EventLoopScheduler(), els => xs.ObserveOn(els)))
.Subscribe(x => Console.WriteLine($"{x} {Thread.CurrentThread.ManagedThreadId}"));
Run Code Online (Sandbox Code Playgroud)
这确保在new EventLoopScheduler()调度程序中为GroupBy操作员创建的每个内部 observable 创建一个线程。该SelectMany变平的组,但保持EventLoopScheduler对每个组相关联。
在你的情况下,你GroupBy的consumeResult.Topic财产。
请确保您的源可观察对象结束,因为线程永远存在,直到它们结束。调用Dispose()订阅就足以结束 observable。
| 归档时间: |
|
| 查看次数: |
82 次 |
| 最近记录: |