标签: system.reactive

使用FromAsyncPattern进行单元测试

Reactive Extensions有一个性感的小钩子来简化调用异步方法:

var func = Observable.FromAsyncPattern<InType, OutType>(
    myWcfService.BeginDoStuff,
    myWcfService.EndDoStuff);

func(inData).ObserveOnDispatcher().Subscribe(x => Foo(x));
Run Code Online (Sandbox Code Playgroud)

我在WPF项目中使用它,它在运行时运行良好.

不幸的是,当尝试使用这种技术的单元测试方法时,我遇到了随机故障.包含此代码的测试的每五次执行中约有3次失败.

这是一个示例测试(使用Rhino/unity自动模拟容器实现):

[TestMethod()]
public void SomeTest()
{
   // arrange
   var container = GetAutoMockingContainer();

   container.Resolve<IMyWcfServiceClient>()
      .Expect(x => x.BeginDoStuff(null, null, null))
      .IgnoreArguments()
      .Do(
         new Func<Specification, AsyncCallback, object, IAsyncResult>((inData, asyncCallback, state) =>
            {
               return new CompletedAsyncResult(asyncCallback, state);
             }));

   container.Resolve<IRepositoryServiceClient>()
      .Expect(x => x.EndDoStuff(null))
      .IgnoreArguments()
      .Do(
         new Func<IAsyncResult, OutData>((ar) =>
         {
            return someMockData;
         }));

   // act
   var target = CreateTestSubject(container);

   target.DoMethodThatInvokesService();

   // Run the dispatcher for everything over background priority
   Dispatcher.CurrentDispatcher.Invoke(DispatcherPriority.Background, new …
Run Code Online (Sandbox Code Playgroud)

.net c# wpf unit-testing system.reactive

2
推荐指数
1
解决办法
1595
查看次数

ReactiveUI:将CanExecute与ReactiveCommand一起使用

我开始在Silverlight项目上使用ReactiveUI框架,需要一些使用ReactiveCommands的帮助.

在我的视图模型中,我有一些看起来大致相似的东西(这只是一个简化的例子):

public class MyViewModel : ReactiveObject
{
  private int MaxRecords = 5;

  public ReactiveCommand AddNewRecord { get; protected set; }

  private ObservableCollection<string> _myCollection = new ObservableCollection<string>();
  public ObservableCollection<string> MyCollection
  {
    get
    {
      return _myCollection;
    }

    set
    {
      _myCollection = value;
      raiseCollectionChanged("MyCollection");
    }
  }

  MyViewModel()
  {
    var canAddRecords = Observable.Return<bool>(MyCollection.Count < MaxRecords);
    AddNewRecord = new ReactiveCommand(canAddRecords);

    AddNewRecord.Subscribe(x => 
    {
       MyCollection.Add("foo");
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

canAddRecords函数在第一次ReactiveCommand创建时进行评估,但是在添加项目时不会重新评估该函数MyCollection.任何人都可以向我展示如何绑定a的canExecute属性ReactiveCommand以便在这种情况下自动重新评估的一个很好的例子吗?

silverlight mvvm system.reactive reactiveui

2
推荐指数
1
解决办法
3458
查看次数

在Rx中从数组创建热的observable

我怎样才能做到这一点?

我有一个代码:

new int[]{1,2,3}.ToObservable().Subscribe(myObserver);
Run Code Online (Sandbox Code Playgroud)

问题是这第一个调用是一个冷可观察的,所以在另一个调用上这样:

new int[]{4,5,6}.ToObservable().Subscribe(myObserver);
Run Code Online (Sandbox Code Playgroud)

myObserver根本不会触发onNext.显然是因为第一个电话发布了1,2,3, END.我不希望observable调用"END",因为我想继续订阅.是否有一个功能可以轻松地为我做这个?

c# reactive-programming system.reactive

2
推荐指数
1
解决办法
3503
查看次数

Reactive Extensions v2中的TestScheduler如何工作

我一直在读这个:使用虚拟时间调度测试Rx查询

我到了"使用单元测试项目"部分(大约在页面的一半)并自己尝试(使用VS2012和MSTest),但我的结果与文档中的结果不同.具体来说,我的断言失败了.

这是我的测试代码:

[TestMethod]
public void TestMethod1()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateColdObservable(
        OnNext(10, 42),
        OnCompleted<int>(20));

    var observer1 = scheduler.CreateObserver<int>();
    scheduler.Schedule(TimeSpan.FromTicks(190), 
                       (s, t) => xs.Subscribe(observer1));

    var observer2 = scheduler.CreateObserver<int>();
    scheduler.Schedule(TimeSpan.FromTicks(220), 
                       (s, t) => xs.Subscribe(observer2));

    scheduler.Start();

    observer1.Messages.AssertEqual(
        OnNext(200, 42),
        OnCompleted<int>(210));

    observer2.Messages.AssertEqual(
        OnNext(230, 42),
        OnCompleted<int>(240));
}
Run Code Online (Sandbox Code Playgroud)

我必须做的一个改变是:

scheduler.Schedule(TimeSpan.FromTicks(190),
                   () => xs.Subscribe(observer1));
Run Code Online (Sandbox Code Playgroud)

成为:

scheduler.Schedule(TimeSpan.FromTicks(190), 
                   (s, t) => xs.Subscribe(observer1));
Run Code Online (Sandbox Code Playgroud)

说实话,我没有意识到这种变化的影响(我没有在C#中编写那么久!).

以下是结果:

Assert.Fail failed. 
Expected: [OnNext(42)@200, OnCompleted()@210]
Actual..: [OnNext(42)@11, OnCompleted()@21]
Run Code Online (Sandbox Code Playgroud)

我理解它的方式是我创建一个冷可观察量,它将在第10个刻度上返回值42,并且序列在第20个刻度处完成.然后,我创建和观察者订阅这个观察者,表示我希望计划从标记190开始.因此,在标记200(190 + 10)处测试值42,在标记210处测试完成时间.然而,我得到标记11和分别为21.

该文档基于V2的RTM之前,因此我不确定TestMcheduler的行为是否已在RTM中更改,或者我是否在某处犯了错误.如果有人能解释发生了什么,我会很感激.

unit-testing system.reactive

2
推荐指数
1
解决办法
1615
查看次数

使用Subject来解耦Observable订阅和初始化

我有一个暴露IObservable状态的API .但是这种状态取决于必须通过初始化的潜在可观察源Init.

我想做的是保护用户不必按正确的顺序执行操作:就像目前的情况一样,如果他们Status在执行之前尝试订阅,则会Init因为源未初始化而获得异常.

所以我有一个天才的想法,用一个Subject来解耦这两个:订阅我的外部用户Status只是订阅主题,然后当他们调用时Init,我使用我的主题订阅底层服务.

代码中的想法

private ISubject<bool> _StatusSubject = new Subject<bool>();
public IObservable<bool> Status { get { return _StatusSubject; } }

public void Init() 
{
    _Connection = new Connection();
    Underlying.GetDeferredObservable(_Connection).Subscribe(_StatusSubject);
}
Run Code Online (Sandbox Code Playgroud)

但是,通过对虚拟项目的测试,问题是初始化通过订阅主题来"唤醒"我的基础Observable,即使没有人订阅该主题.如果可能的话,这是我想避免的,但我不确定如何......

(我也注意到所接受的智慧,"一般规则是,如果你正在使用一个主题,那么你做错了什么")

c# subject system.reactive

2
推荐指数
1
解决办法
1105
查看次数

吞咽IObservable异常

是否有可能设计一个Rx运算符,它以相同的顺序吞下重复的异常(与Retry或Catch不同)?本质上与.Select(x => x)相同,但在此过程中忽略异常.(我知道这是违反准则的).

exception system.reactive

2
推荐指数
1
解决办法
145
查看次数

如何在长时间运行的查询中间扩展Throttle Timespan?

是否可以在查询中间扩展Throttle Timespan值?例如,假设一个例子,如101 Rx Samples Throttle,就有这个查询var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));.

如果我想要改变它以便如果在前500毫秒期间将没有事件,那么对于之后的每个事件,节流值将被扩展到例如1500毫秒.

这是一个使用Switch运营商的地方吗?

c# reactive-programming system.reactive

2
推荐指数
1
解决办法
472
查看次数

为什么我的RX链阻塞?

所以我有以下RX更改,但它似乎阻止选择,好像要保留顺序.我的理解是它应该继续委托给任务池?

var observable = Observable.Interval(TimeSpan.FromMilliseconds(10));

observable.ObserveOn(Scheduler.TaskPool)
    .Select(
    i =>
    {
        Console.WriteLine("Here" + System.Threading.Thread.CurrentThread.ManagedThreadId);
        System.Threading.Thread.Sleep(5000);
        return i;
    })
    .ObserveOn(Scheduler.TaskPool)
    .SubscribeOn(Scheduler.TaskPool)
    .Subscribe(i => { Console.WriteLine(i); });
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

2
推荐指数
1
解决办法
391
查看次数

我应该使用dispatcher.Invoke(...)还是ObserveOn(调度程序)?

我已经获得了一些代码来处理调度程序在其构造函数中传递给视图模型的位置.我现在想知道我是否应该使用ObserveOn(dispatcher),或者dispatcher.Invoke(...)当我想在UI线程上执行某些操作时.

例如,我可以这样做:

this.WhenAny(me => me.SomeValue, _ => Unit.Default)
.ObserveOn(dispatcher)
.Subscribe(_ => SomeMethod());

...

private void SomeMethod()
{
    //do some stuff
}
Run Code Online (Sandbox Code Playgroud)

或者我可以这样做:

this.WhenAny(me => me.SomeValue, _ => Unit.Default)
.Subscribe(_ => SomeMethod());
Run Code Online (Sandbox Code Playgroud)

这意味着我可以这样做:

private void SomeMethod()
{
    dispatcher.Invoke(new Action(() =>{//do some stuff});
}
Run Code Online (Sandbox Code Playgroud)

两者之间有什么重大区别吗?

我担心的是,如果我想在代码的其他部分调用SomeMethod(),这不是由SomeValue更改引发的?然后我需要做dispatcher.invoke(new Action(() => someMethod()));,这让我觉得使用dispatcher.Invoke(...)内部SomeMethod是最好的选择.

这是好事还是坏事?在那一刻,我正在均匀地使用这两种技术.我打算转到其中一个,但想先知道正确的方法.

c# system.reactive reactiveui

2
推荐指数
1
解决办法
415
查看次数

Action的结果是什么?如何使用它?

Reactive Extensions有一个Observable.Start()功能.

Visual Studio告诉我它

通过可观察的序列异步调用结果.

它有返回类型 IObservable<Unit>

MDSN所说的Unit就是它

代表无效.

但为什么我需要代表void

为什么我需要将voids 流转换为list并从中获取第一个元素来替换并行ForEach

c# system.reactive

2
推荐指数
1
解决办法
452
查看次数