标签: system.reactive

如何在Reactive Extensions中将项目缓冲到组中?

我有一个IObservable; 其中属性更改具有实体ID和PropertyName.我想用它来更新数据库,但是如果多个属性几乎同时发生变化,我只想对同一个实体的所有属性进行一次更新.

如果这是一个静态IEnumerable并且我使用LINQ我可以简单地使用:

MyList.GroupBy(C=>C.EntityID);
Run Code Online (Sandbox Code Playgroud)

但是,列表永远不会终止(从不调用IObserver.OnComplete).我想要做的是等待一段时间,比如1秒钟,将所有呼叫分组适当的一秒钟.

理想情况下,我会为每个EntityID设置单独的计数器,只要找到该EntityID的新属性更改,它们就会重置.

我不能使用像Throttle这样的东西,因为我想处理所有的属性更改,我只是想一次性处理它们.

c# linq system.reactive observable observer-pattern

3
推荐指数
1
解决办法
1931
查看次数

是否可以在Rx中的不同线程上调用订阅者的OnNexts?

我是Rx的新手.我想知道是否可以向不同的订阅者发送消息,以便它们在不同的线程上运行?IObserable如何控制呢?简单的Subject实现,据我所知,它在一个线程上一个接一个地调用订阅者.


public class Subsciber : IObserver<int>
{
    public void OnNext(int a)
    {
        // Do something
    }
    public void OnError(Exception e)
    {
        // Do something
    }
    public void OnCompeleted()
    {
    }

} 

public static class Program
{
   public void static Main()
   {
       var observable = new <....SomeClass....>();
       var sub1 = new Subscriber();
       var sub2 = new Subscriber();
       observable.Subscribe(sub1);
       observable.Subscribe(sub2);
       // some waiting function 
   }
}
Run Code Online (Sandbox Code Playgroud)

如果我使用Subject作为'SomeClass',那么在sub1的OnNext()完成之前,不会调用sub2的OnNext().如果sub1花了很多时间,我不希望它延迟sub2的接收.有人能告诉我Rx如何为SomeClass提供这种实现.

c# system.reactive

3
推荐指数
1
解决办法
3922
查看次数

反应性:试图了解Subject <T>的工作原理

试图了解如何Subject<T>,ReplaySubject<T>和其他工作.这是一个例子:

(主题是观察者观察者)

public IObservable<int> CreateObservable()
{
     Subject<int> subj = new Subject<int>();                // case 1
     ReplaySubject<int> subj = new ReplaySubject<int>();    // case 2

     Random rnd = new Random();
     int maxValue = rnd.Next(20);
     Trace.TraceInformation("Max value is: " + maxValue.ToString());

     subj.OnNext(-1);           // specific value

     for(int iCounter = 0; iCounter < maxValue; iCounter++)
     {
          Trace.TraceInformation("Value: " + iCounter.ToString() + " is about to publish");
          subj.OnNext(iCounter);
      }

      Trace.TraceInformation("Publish complete");
      subj.OnComplete();

      return subj;
 }

 public void Main()
 {
     //
     // …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

3
推荐指数
2
解决办法
3553
查看次数

复合观察

我有以下代码:

Observable.FromEvent<ModelEventArgs>(
        h => ValuesController.ModelAdded += h,
        h => ValuesController.ModelAdded -= h)
    .Subscribe(m => context.Connection.Broadcast(m));

Observable.FromEvent<ModelEventArgs>(
        h => ValuesController.ModelDeleted += h,
        h => ValuesController.ModelDeleted -= h)
    .Subscribe(m => context.Connection.Broadcast(m));
Run Code Online (Sandbox Code Playgroud)

什么会更清洁:

Observable.[SOMETHING](
        Observable.FromEvent<ModelEventArgs>(
                h => ValuesController.ModelAdded += h,
                h => ValuesController.ModelAdded -= h),
        Observable.FromEvent<ModelEventArgs>(
                h => ValuesController.ModelDeleted += h,
                h => ValuesController.ModelDeleted -= h))
    .Subscribe(m => context.Connection.Broadcast(m));
Run Code Online (Sandbox Code Playgroud)

我似乎无法弄清楚[SOMETHING]需要什么,我不想等待ModelAdded完成,我只是希望它们都为每个事件发出(m).

对我来说很容易,我是一个Rx newb

c# system.reactive

3
推荐指数
1
解决办法
450
查看次数

在Reactive Extensions中实现自定义Hardware-timer-basede调度程序

是否可以为响应式扩展实现基于自定义硬件定时器的调度程序?我怎么开始,有什么好的例子吗?

我有一个硬件可以每毫秒发送一个准确的中断.我想利用它来创建更精确的RX调度程序.

UPDATE

感谢Asti回答的关键字,我发现了这篇博文,这让我发现我可以实现VirtualTimeScheduler <TAbsolute,TRelative>,因为我的硬件设备为我提供了绝对的时间戳.

system.reactive

3
推荐指数
1
解决办法
219
查看次数

可观察的等待方法C#

Observable.FromAsyncPattern可用于从BeginX EndX样式的异步方法中创建一个observable.

也许我误解了一些事情,但是有一个类似的功能来从新的异步样式方法创建一个observable - 即.. Stream.ReadAsync?

c# system.reactive

3
推荐指数
1
解决办法
1219
查看次数

BlockingCollection vs Subject用作消费者

我正在尝试用C#实现一个消费者.有许多发布者可以同时执行.我创建了三个示例,一个使用Rx和subject,一个使用BlockingCollection,第三个使用BlockingCollection中的ToObservable.在这个简单的例子中,它们都做同样的事情,我希望它们与多个生产者一起工作.

每种方法有哪些不同的特质?

我已经在使用Rx,所以我更喜欢这种方法.但我担心OnNext没有线程安全保证,我不知道排队语义是什么主题和默认调度程序.

有线程安全主题吗?

是否要处理所有消息?

当这不起作用时还有其他任何情况吗?是同时处理?

void SubjectOnDefaultScheduler()
{
    var observable = new Subject<long>();
    observable.
        ObserveOn(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    observable.OnNext(1);
    observable.OnNext(2);
    observable.OnNext(3);
}
Run Code Online (Sandbox Code Playgroud)

不是Rx,但很容易适应使用/订阅它.它需要一个项目然后处理它.这应该是连续发生的.

void BlockingCollectionAndConsumingTask()
{
    var blockingCollection = new BlockingCollection<long>();
    var taskFactory = new TaskFactory();
    taskFactory.StartNew(() =>
    {
        foreach (var i in blockingCollection.GetConsumingEnumerable())
        {
            DoWork(i);
        }
    });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}
Run Code Online (Sandbox Code Playgroud)

使用阻塞集合有点像主题似乎是一个很好的妥协.我猜是隐式地会安排到任务,所以我可以使用async/await,这是正确的吗?

void BlockingCollectionToObservable()
{
    var blockingCollection = new BlockingCollection<long>();
    blockingCollection.
        GetConsumingEnumerable().
        ToObservable(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}
Run Code Online (Sandbox Code Playgroud)

c# subject task-parallel-library system.reactive blockingcollection

3
推荐指数
1
解决办法
1639
查看次数

BehaviorSubject.First()已经过时,有什么替代方案?

所以我有一个BehaviorSubject<string>我希望用于房产的.我一直在使用阻塞First()方法来获取BehaviorSubject属性getter中保持的当前值.

First()操作现在已经过时,那么实现这一目标的新方法是什么?

.net system.reactive

3
推荐指数
2
解决办法
1386
查看次数

带有参数关闭功能的Rx Observable Window

我正在尝试将observable分隔成窗口(或者为了我的目的,Buffers也很好),同时能够关闭自定义位置的窗口/缓冲区.

例如,我有一个observable,它从1开始生成整数并向上移动.我想在每个可被7整除的数字处关闭一个窗口.在这种情况下,我的结束函数需要将该项作为参数.

方法有一个重载Window:

Window<TSource, TWindowClosing>(IObservable<TSource>, Func<IObservable<TWindowClosing>>)
Run Code Online (Sandbox Code Playgroud)

要么不能使用这个重载,要么我无法绕过它.文档描述它完全符合我的要求,但没有显示示例.此外,它还显示了非确定性结束的示例,它取决于关闭可观察集合发出项目时的时间.

Window运算符将可观察序列分解为连续的非重叠窗口.当前窗口的结束和下一个窗口的开始由可观察序列控制,该序列是windowClosingSelect函数的结果,该函数作为输入参数传递给操作符.运算符可用于将一组事件分组到窗口中.例如,交易的状态可以是被观察的主要序列.这些州可能包括:准备,准备,活动和承诺/中止.主序列可以包括它们按顺序出现的所有状态.windowClosingSelect函数可以返回一个可观察的序列,该序列仅在Committed或Abort状态下生成一个值.这将关闭表示特定事务的事务事件的窗口.

我认为像下面这样的人会做这个工作,但我必须自己实施:

Window<TSource, TWindowClosing>(IObservable<TSource>, Func<TSource, bool>)
Run Code Online (Sandbox Code Playgroud)
  • 内置函数是否可以实现这种窗口化(我知道我可以自己构建一个)?
  • 一旦从窗口可观察的项目中发出一个项目,是否可以基于发出的项目关闭窗口或仅非确定性地关闭窗口?

system.reactive

3
推荐指数
1
解决办法
881
查看次数

在F#中混合IObservable和Async <'a>

我有IObservable一个库提供,它从外部服务侦听事件:

let startObservable () : IObservable<'a> = failwith "Given"
Run Code Online (Sandbox Code Playgroud)

对于每个收到的事件,我想执行一个返回的动作Async:

let action (item: 'a) : Async<unit> = failwith "Given"
Run Code Online (Sandbox Code Playgroud)

我正在尝试实现一个处理器

let processor () : Async<unit> =
    startObservable()
    |> Observable.mapAsync action
    |> Async.AwaitObservable
Run Code Online (Sandbox Code Playgroud)

我已经弥补mapAsync并且AwaitObservable:理想情况下它们将由一些图书馆提供,到目前为止我找不到它.

额外要求:

  • 应该按顺序执行操作,以便在处理上一个事件时缓冲后续事件.

  • 如果某个操作引发错误,我希望我的处理器完成.否则,它永远不会完成.

  • Async.Start应该尊重通过的取消令牌.

关于我应该使用的图书馆的任何提示?

f# asynchronous system.reactive

3
推荐指数
1
解决办法
239
查看次数