标签: system.reactive

ObserveOn(Scheduler.CurrentThread)不会导致订阅的Action在原始线程上运行

我有一个Action,它接受一个回调,一旦它完成就会调用泛型参数,即一个Action<Action<T>>.我想在操作开始时显示一个忙碌的微调器,然后在调用回调时将其取出,所以我创建了一个简单的实用程序来执行此操作.我遇到的问题是用户希望他们的回调在原始调用线程上运行,但并不总是如此.它几乎总是在单元测试(nUnit)中完美地工作,但在应用程序实际运行时不适用于某些调用(WPF,.Net 4).

以下是我所拥有的相关内容

void WrapAsyncCallbackPattern<T>(Action<T> callback, Action<Action<T>> actionToRun)
{
    var subject = new AsyncSubject<T>();
    try
    {
        actionToRun(
            result =>
            {
                subject.OnNext(result);
                subject.OnCompleted();
            });
    }
    catch (Exception ex)
    {
        subject.OnError(ex);
    }

    subject
        .ObserveOn(Scheduler.CurrentThread)
        .Subscribe(callback, OnError);
}
Run Code Online (Sandbox Code Playgroud)

我想callback在我正在订阅的线程上运行(并且subject已声明)但它似乎没有可靠地执行.我认为我做的事情很傻.它是什么?

编辑:添加了单元测试代码

private readonly TimeSpan m_WaitTime = TimeSpan.FromSeconds(1);

[Test]
public void WrapAsyncCallbackPattern_WithActionOnDifferentThread_CallsCallbackOnSameThread()
{
    var awaiter = new AutoResetEvent(false);

    bool callbackRan = false;
    int callingThreadId = Thread.CurrentThread.ManagedThreadId;
    int callbackThreadId = int.MinValue;
    int actionThreadId = int.MinValue;

    BackgroundOperation.WrapAsyncCallbackPattern<int>(
        _ =>
        {
            callbackRan …
Run Code Online (Sandbox Code Playgroud)

system.reactive c#-4.0

1
推荐指数
1
解决办法
1096
查看次数

具有CanExecute组合标准的ReactiveCommand

我刚开始使用ReactiveUI.我有以下课程:

public class MainViewModel : ReactiveObject, IRoutableViewModel 
{
   private string shareText;

   public ReactiveCollection<SharingAccountViewModel> SelectedAccounts { get; private set; }

   public string ShareText 
   { 
      get { return shareText; }
      set
      {
          this.RaiseAndSetIfChanged(ref shareText, value);
      }
   }

   public IReactiveCommand ShareCommand { get; private set; }
}
Run Code Online (Sandbox Code Playgroud)

我想要做的是在以下条件为真时允许命令执行:

  1. ShareText属性不为null或为空
  2. SelectedAccounts集合包含至少一个值

我尝试了以下但它不起作用,因为连接到命令的按钮永远不会启用:

ShareCommand = new ReactiveCommand(this.WhenAny(viewModel => viewModel.ShareText,
  viewModel => viewModel.SelectedAccounts,
  (x, y) => !String.IsNullOrEmpty(x.Value) && y.Value.Count > 0));
Run Code Online (Sandbox Code Playgroud)

如果我只检查ShareText属性,它可以正常工作:

  ShareCommand = new ReactiveCommand(this.WhenAny(viewModel => viewModel.ShareText,
            (x) => !String.IsNullOrEmpty(x.Value)));
Run Code Online (Sandbox Code Playgroud)

我查看了ReactiveUI问题的答案:将CanExecute与ReactiveCommand一起使用

基于此我尝试了以下内容: …

c# system.reactive reactiveui

1
推荐指数
1
解决办法
1675
查看次数

RX语法要等到收到特定值,然后取消,然后取消订阅?

我正在等待流中的特定值,此时,我想在我的订阅者中做一些工作,并且还取消订阅正在进行的流.这样做的最佳语法是什么?

.net system.reactive

1
推荐指数
1
解决办法
603
查看次数

使用Reactive Extensions - 这是异步吗?

试图转换现有的数据访问代码,以异步和跨越的Rx来了,因为你不能返回Task<IEnumerable<T>>yield return在你的方法体.

我写了这个,但不确定它的异步,所以指针感激不尽

public class EmployeeRepository : IEmployeeRepository
{
    public IAsyncEnumerable<Employee> GetEmployees()
    {
        return Enumerable().ToAsyncEnumerable();
    }

    private IEnumerable<Employee> Enumerable()
    {
        using (var connection = new SqlConnection(ConfigurationManager.ConnectionStrings["DBConnString"].ConnectionString))
        {
            connection.Open();
            using (var command = new SqlCommand(@"SELECT * FROM EMPLOYEES", connection))
            {
                using (var reader = command.ExecuteReader())
                {
                    while (reader.Read())
                    {
                        yield return
                            new Employee()
                                {
                                    Id = ReadField<int>(reader, "Id"),
                                    Name = ReadField<string>(reader, "Name")
                                };
                    }
                }
            }
        }
    }

    private static T ReadField<T>(IDataRecord reader, string fieldName)
    {
        var …
Run Code Online (Sandbox Code Playgroud)

.net c# asynchronous system.reactive

1
推荐指数
1
解决办法
1982
查看次数

处理大量小任务并保持UI响应

我有一个WPF应用程序需要处理许多小任务.这些小任务都是同时生成的,并添加到Dispatcher Queue中,优先级为Normal.同时显示忙碌指示符.结果是,尽管工作被分解为任务,但忙碌指示器实际上已冻结.

我尝试将这些任务的优先级更改为后台以查看是否已修复它,但仍然忙碌的指示器冻结.

我订阅了该Dispatcher.Hooks.OperationStarted事件,以查看在我的任务处理过程中是否发生了任何渲染作业,但他们没有.

有什么想法发生了什么?

一些技术细节:任务实际上只是来自Observable序列的消息,它们通过调用ReactiveUI来"排队"到调度程序中,ObserveOn(RxApp.MainThreadScheduler)它应该等同于ObserveOn(DispatcherScheduler).这些任务中的每一个的工作部分是通过ObserveOn调用订阅的代码,例如

IObservable<TaskMessage> incomingTasks;
incomingTasks.ObserveOn(RxApp.MainThreadScheduler).Subscribe(SomeMethodWhichDoesWork);
Run Code Online (Sandbox Code Playgroud)

在这个例子中,incomingTasks会连续产生3000多条消息,ObserveOn将每次调用SomeMethodWhichDoesWork推送到Dispatcher队列,以便稍后处理

wpf dispatcher system.reactive reactiveui

1
推荐指数
1
解决办法
1392
查看次数

我可以简化使用Observable.Create从json请求返回对象

我一直在这样做我的网页请求:

public IObservable<Foo> GetFoo(string fooId)
{
    var uri = /* set up the GET query parameters here */;

    return Observable.Create<Foo>(
        obs => new HttpClient().GetAsync(uri)
            .ToObservable()
            .Subscribe(response =>
            {
                try
                {
                    response.EnsureSuccessStatusCode();

                    response.Content.ReadAsStringAsync()
                        .ToObservable()
                        .Select(JObject.Parse)
                        .Select(json => new Foo(json))
                        .Subscribe(foo =>
                        {
                            obs.OnNext(foo);
                            obs.OnCompleted();
                        });
                }
                catch (Exception e)
                {
                    obs.OnError(e);
                }
            }));
Run Code Online (Sandbox Code Playgroud)

}

但是,我觉得作为Rx的一个相对较新的人,我可能会错过一些简化我所写内容的方法.看起来外部Observable只是内部的一个包装,我可以(不知何故)揭露它,而不是 - 我只是不知道如何.

c# system.reactive

1
推荐指数
1
解决办法
1350
查看次数

ObservableForProperty用于多个Reactive Observable Sequence

我试图ObservableForProperty多个反应观察序列.这是代码.

using ReactiveUI;

public ReactiveUIDerivedClass<T> FName {get; private set;}
public ReactiveUIDerivedClass<T> MName {get; private set;}
public ReactiveUIDerivedClass<T> LName {get; private set;}

FName = new ReactiveUIDerivedClass<T>();
MName = new ReactiveUIDerivedClass<T>();
LName = new ReactiveUIDerivedClass<T>();
Run Code Online (Sandbox Code Playgroud)

这里T是System.String.因此,以下方法有效.

private void ObserveForPropertyInThreeFields()
{
  Observable.CombineLatest(FName.ObservableForProperty(p => p.Value),
                           MName.ObservableForProperty(p => p.Value),
                           LName.ObservableForProperty(p => p.Value))
            .Where(p => p.All(v => !string.IsNullOrWhiteSpace(v.Value)))
            .Subscribe(p => { /* do some stuff */ } );
}
Run Code Online (Sandbox Code Playgroud)

现在,我希望获得类似的结果,其中属性的T有三种不同的类型.当我使用Observable.CombineLatest时,我收到以下错误消息.

无法从用法中推断出方法'System.Reactive.Linq.Observable.CombineLatest(System.IObservable,System.IObservable,System.Func)'的类型参数.尝试显式指定类型参数.

有关ObserveForProperty如何更改不同类型的三个属性的任何想法?

c# mvvm system.reactive reactiveui

1
推荐指数
1
解决办法
1473
查看次数

使用Observable/Reactive Extensions检测鼠标移动的开始和停止?

让我先说:

  1. 我对Rx完全不熟悉
  2. 我没有使用C#/ Linq练习
  3. 我正在进行实验/学习,因此这个问题没有应用

我读过一些介绍内容,包括本介绍无框架马修Podwysocki.

所以我从他的一个例子开始,写了一些像这样的鼠标拖动/绘图代码:

var leftMouseDown = Observable.FromEventPattern<MouseEventArgs>(mainCanvas, "MouseLeftButtonDown");
var leftMouseUp = Observable.FromEventPattern<MouseButtonEventArgs>(mainCanvas, "MouseLeftButtonUp");
var mouseMove = Observable.FromEventPattern<MouseEventArgs>(mainCanvas, "MouseMove");

var mouseMoves = from mm in mouseMove
                 let location = mm.EventArgs.GetPosition(mainCanvas)
                 select new { location.X, location.Y };

var mouseDiffs = mouseMoves.Skip(1).Zip(mouseMoves, (l, r) => new { X1 = l.X, Y1 = l.Y, X2 = r.X, Y2 = r.Y });

var mouseDrag = from _ in leftMouseDown
                from md in mouseDiffs.TakeUntil(leftMouseUp)
                select …
Run Code Online (Sandbox Code Playgroud)

c# linq system.reactive

1
推荐指数
1
解决办法
368
查看次数

“无止境” TakeWhile,BufferWhile和SkipWhile RX.Net序列

我想知道是否存在一种方法来获取可观察的流并使用* While运算符,尤其是TakeWhile,SkipWhile和BufferWhile,以便在满足bool“ while”条件时,它们的订阅者不会收到.OnComplete?

当我开始使用.TakeWhile / SkipWhile和BufferWhile运算符时,我假定它们不会终止/ .OnComplete(),而只是在满足布尔条件时才发出/不发出。

举个例子可能更有意义:

我有一个bool标志,指示一个实例是否忙,以及一个可观察的数据流:

private bool IsBusy { get;set; }
private bool IgnoreChanges { get;set; }

private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
Run Code Online (Sandbox Code Playgroud)

..并像这样(简化)使用/设置RX流

private void SetupRx()
{
    ConsumerSubscription = Producer
        .SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false
        .BufferWhile(_ => IsBusy == true) // for all …
Run Code Online (Sandbox Code Playgroud)

.net reactive-programming system.reactive

1
推荐指数
1
解决办法
372
查看次数

使用TestScheduler对Akavache的单元测试缓存行为

所以我试图在使用Akavache的应用程序中测试缓存行为.我的测试看起来像这样:

using Akavache;
using Microsoft.Reactive.Testing;
using Moq;
using NUnit.Framework;
using ReactiveUI.Testing;
using System;
using System.Threading.Tasks;

[TestFixture]
public class CacheFixture
{
    [Test]
    public async Task CachingTest()
    {
        var scheduler = new TestScheduler();
        // replacing the TestScheduler with the scheduler below works
        // var scheduler = CurrentThreadScheduler.Instance;
        var cache = new InMemoryBlobCache(scheduler);

        var someApi = new Mock<ISomeApi>();
        someApi.Setup(s => s.GetSomeStrings())
            .Returns(Task.FromResult("helloworld")).Verifiable();
        var apiWrapper = new SomeApiWrapper(someApi.Object, cache,
            TimeSpan.FromSeconds(10));

        var string1 = await apiWrapper.GetSomeStrings();
        someApi.Verify(s => s.GetSomeStrings(), Times.Once());
        StringAssert.AreEqualIgnoringCase("helloworld", string1);

        scheduler.AdvanceToMs(5000);
        // without …
Run Code Online (Sandbox Code Playgroud)

c# unit-testing system.reactive akavache

1
推荐指数
1
解决办法
678
查看次数