Reactive Extensions(RX)自定义调度程序C#

ben*_*mos 6 c# system.reactive

是否可以构建一个自定义调度程序,可以检查通过IObservable的每个元素的值,以便决定处理该项目的线程?

我需要按顺序处理具有相同键的项目,但并行处理不同的键.让RX进行调度是有意义的,而不是为了将每个值分配给线程而必须比我想要的更早离开observable.

Bra*_*don 8

你试过GroupBy跟着ObserveOn吗?

就像是:

source
    .GroupBy(item => item.Key)
    .SelectMany(group => group
        .ObserveOn(Scheduler.NewThread)
        .Select(item => process(item))
    )
    .Subscribe(processResult => ...);
Run Code Online (Sandbox Code Playgroud)

这将按键对流进行分区,为每个键启动一个新线程,并process()为该键中的每个项运行.

  • 是的,它可能是'ObserveOn`的工作方式.你可以使用`GroupByUntil(item => item.Key,group => group.Throttle(TimeSpan.FromSeconds(30))`,这会导致密钥(及其线程)在最近没有看到密钥时关闭(如果密钥再次显示,它将重新启动). (2认同)