按组在单独的线程中运行处理

Я T*_*bur 0 c# asynchronous system.reactive async-await rx.net

我正在尝试在我的 Kafka 消费者中使用 Rx。

public static event EventHandler<ConsumeResult<string, string>> GenericEvent;
Run Code Online (Sandbox Code Playgroud)

然后我有以下代码

var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
     x => GenericEvent += x,
     x => GenericEvent -= x)
  .Select(x => x.EventArgs);

while (!cancellationToken.IsCancellationRequested)
{
    ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
    GenericEvent(consumeResult.Topic, consumeResult);
}
Run Code Online (Sandbox Code Playgroud)

然后在某处我使用它

var subscription = observable.Subscribe(message =>
{
    Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");

    //_kafkaConsumer.Commit(messages);
});
Run Code Online (Sandbox Code Playgroud)

是否有可能按主题名称 ( consumeResult.Topic)运行单独的线程?当消费者收到一条消息时,它应该将其按主题重定向到相应的线程

在此处输入图片说明

Eni*_*ity 5

试一试:

Observable
    .Interval(TimeSpan.FromSeconds(0.1))
    .Take(20)
    .GroupBy(x => x % 3)
    .SelectMany(xs => Observable.Using(() => new EventLoopScheduler(), els => xs.ObserveOn(els)))
    .Subscribe(x => Console.WriteLine($"{x} {Thread.CurrentThread.ManagedThreadId}"));
Run Code Online (Sandbox Code Playgroud)

这确保在new EventLoopScheduler()调度程序中为GroupBy操作员创建的每个内部 observable 创建一个线程。该SelectMany变平的组,但保持EventLoopScheduler对每个组相关联。

在你的情况下,你GroupByconsumeResult.Topic财产。

请确保您的源可观察对象结束,因为线程永远存在,直到它们结束。调用Dispose()订阅就足以结束 observable。

  • @Chebur - `Observable.Using(() =&gt; new EventLoopScheduler(), els =&gt; xs.ObserveOn(els))` 代码使用每个 `new EventLoopScheduler()` 创建一个新线程。在每个“EventLoopScheduler”调用“Dispose()”之前,这些线程不会退出。当外部可观察完成时就会发生这种情况。在我的示例中,我使用了 `Take(20)` 使其在 `20` 元素之后自然停止,但是如果您没有自然结束,那么您需要处置订阅以使其结束,并且允许新线程自然退出。 (2认同)