我有一个Action,它接受一个回调,一旦它完成就会调用泛型参数,即一个Action<Action<T>>.我想在操作开始时显示一个忙碌的微调器,然后在调用回调时将其取出,所以我创建了一个简单的实用程序来执行此操作.我遇到的问题是用户希望他们的回调在原始调用线程上运行,但并不总是如此.它几乎总是在单元测试(nUnit)中完美地工作,但在应用程序实际运行时不适用于某些调用(WPF,.Net 4).
以下是我所拥有的相关内容
void WrapAsyncCallbackPattern<T>(Action<T> callback, Action<Action<T>> actionToRun)
{
var subject = new AsyncSubject<T>();
try
{
actionToRun(
result =>
{
subject.OnNext(result);
subject.OnCompleted();
});
}
catch (Exception ex)
{
subject.OnError(ex);
}
subject
.ObserveOn(Scheduler.CurrentThread)
.Subscribe(callback, OnError);
}
Run Code Online (Sandbox Code Playgroud)
我想callback在我正在订阅的线程上运行(并且subject已声明)但它似乎没有可靠地执行.我认为我做的事情很傻.它是什么?
编辑:添加了单元测试代码
private readonly TimeSpan m_WaitTime = TimeSpan.FromSeconds(1);
[Test]
public void WrapAsyncCallbackPattern_WithActionOnDifferentThread_CallsCallbackOnSameThread()
{
var awaiter = new AutoResetEvent(false);
bool callbackRan = false;
int callingThreadId = Thread.CurrentThread.ManagedThreadId;
int callbackThreadId = int.MinValue;
int actionThreadId = int.MinValue;
BackgroundOperation.WrapAsyncCallbackPattern<int>(
_ =>
{
callbackRan …Run Code Online (Sandbox Code Playgroud) 我刚开始使用ReactiveUI.我有以下课程:
public class MainViewModel : ReactiveObject, IRoutableViewModel
{
private string shareText;
public ReactiveCollection<SharingAccountViewModel> SelectedAccounts { get; private set; }
public string ShareText
{
get { return shareText; }
set
{
this.RaiseAndSetIfChanged(ref shareText, value);
}
}
public IReactiveCommand ShareCommand { get; private set; }
}
Run Code Online (Sandbox Code Playgroud)
我想要做的是在以下条件为真时允许命令执行:
我尝试了以下但它不起作用,因为连接到命令的按钮永远不会启用:
ShareCommand = new ReactiveCommand(this.WhenAny(viewModel => viewModel.ShareText,
viewModel => viewModel.SelectedAccounts,
(x, y) => !String.IsNullOrEmpty(x.Value) && y.Value.Count > 0));
Run Code Online (Sandbox Code Playgroud)
如果我只检查ShareText属性,它可以正常工作:
ShareCommand = new ReactiveCommand(this.WhenAny(viewModel => viewModel.ShareText,
(x) => !String.IsNullOrEmpty(x.Value)));
Run Code Online (Sandbox Code Playgroud)
我查看了ReactiveUI问题的答案:将CanExecute与ReactiveCommand一起使用
基于此我尝试了以下内容: …
我正在等待流中的特定值,此时,我想在我的订阅者中做一些工作,并且还取消订阅正在进行的流.这样做的最佳语法是什么?
试图转换现有的数据访问代码,以异步和跨越的Rx来了,因为你不能返回Task<IEnumerable<T>>用yield return在你的方法体.
我写了这个,但不确定它的异步,所以指针感激不尽
public class EmployeeRepository : IEmployeeRepository
{
public IAsyncEnumerable<Employee> GetEmployees()
{
return Enumerable().ToAsyncEnumerable();
}
private IEnumerable<Employee> Enumerable()
{
using (var connection = new SqlConnection(ConfigurationManager.ConnectionStrings["DBConnString"].ConnectionString))
{
connection.Open();
using (var command = new SqlCommand(@"SELECT * FROM EMPLOYEES", connection))
{
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
yield return
new Employee()
{
Id = ReadField<int>(reader, "Id"),
Name = ReadField<string>(reader, "Name")
};
}
}
}
}
}
private static T ReadField<T>(IDataRecord reader, string fieldName)
{
var …Run Code Online (Sandbox Code Playgroud) 我有一个WPF应用程序需要处理许多小任务.这些小任务都是同时生成的,并添加到Dispatcher Queue中,优先级为Normal.同时显示忙碌指示符.结果是,尽管工作被分解为任务,但忙碌指示器实际上已冻结.
我尝试将这些任务的优先级更改为后台以查看是否已修复它,但仍然忙碌的指示器冻结.
我订阅了该Dispatcher.Hooks.OperationStarted事件,以查看在我的任务处理过程中是否发生了任何渲染作业,但他们没有.
有什么想法发生了什么?
一些技术细节:任务实际上只是来自Observable序列的消息,它们通过调用ReactiveUI来"排队"到调度程序中,ObserveOn(RxApp.MainThreadScheduler)它应该等同于ObserveOn(DispatcherScheduler).这些任务中的每一个的工作部分是通过ObserveOn调用订阅的代码,例如
IObservable<TaskMessage> incomingTasks;
incomingTasks.ObserveOn(RxApp.MainThreadScheduler).Subscribe(SomeMethodWhichDoesWork);
Run Code Online (Sandbox Code Playgroud)
在这个例子中,incomingTasks会连续产生3000多条消息,ObserveOn将每次调用SomeMethodWhichDoesWork推送到Dispatcher队列,以便稍后处理
我一直在这样做我的网页请求:
public IObservable<Foo> GetFoo(string fooId)
{
var uri = /* set up the GET query parameters here */;
return Observable.Create<Foo>(
obs => new HttpClient().GetAsync(uri)
.ToObservable()
.Subscribe(response =>
{
try
{
response.EnsureSuccessStatusCode();
response.Content.ReadAsStringAsync()
.ToObservable()
.Select(JObject.Parse)
.Select(json => new Foo(json))
.Subscribe(foo =>
{
obs.OnNext(foo);
obs.OnCompleted();
});
}
catch (Exception e)
{
obs.OnError(e);
}
}));
Run Code Online (Sandbox Code Playgroud)
}
但是,我觉得作为Rx的一个相对较新的人,我可能会错过一些简化我所写内容的方法.看起来外部Observable只是内部的一个包装,我可以(不知何故)揭露它,而不是 - 我只是不知道如何.
我试图ObservableForProperty多个反应观察序列.这是代码.
using ReactiveUI;
public ReactiveUIDerivedClass<T> FName {get; private set;}
public ReactiveUIDerivedClass<T> MName {get; private set;}
public ReactiveUIDerivedClass<T> LName {get; private set;}
FName = new ReactiveUIDerivedClass<T>();
MName = new ReactiveUIDerivedClass<T>();
LName = new ReactiveUIDerivedClass<T>();
Run Code Online (Sandbox Code Playgroud)
这里T是System.String.因此,以下方法有效.
private void ObserveForPropertyInThreeFields()
{
Observable.CombineLatest(FName.ObservableForProperty(p => p.Value),
MName.ObservableForProperty(p => p.Value),
LName.ObservableForProperty(p => p.Value))
.Where(p => p.All(v => !string.IsNullOrWhiteSpace(v.Value)))
.Subscribe(p => { /* do some stuff */ } );
}
Run Code Online (Sandbox Code Playgroud)
现在,我希望获得类似的结果,其中属性的T有三种不同的类型.当我使用Observable.CombineLatest时,我收到以下错误消息.
无法从用法中推断出方法'System.Reactive.Linq.Observable.CombineLatest(System.IObservable,System.IObservable,System.Func)'的类型参数.尝试显式指定类型参数.
有关ObserveForProperty如何更改不同类型的三个属性的任何想法?
让我先说:
我读过一些介绍内容,包括本介绍无框架由马修Podwysocki.
所以我从他的一个例子开始,写了一些像这样的鼠标拖动/绘图代码:
var leftMouseDown = Observable.FromEventPattern<MouseEventArgs>(mainCanvas, "MouseLeftButtonDown");
var leftMouseUp = Observable.FromEventPattern<MouseButtonEventArgs>(mainCanvas, "MouseLeftButtonUp");
var mouseMove = Observable.FromEventPattern<MouseEventArgs>(mainCanvas, "MouseMove");
var mouseMoves = from mm in mouseMove
let location = mm.EventArgs.GetPosition(mainCanvas)
select new { location.X, location.Y };
var mouseDiffs = mouseMoves.Skip(1).Zip(mouseMoves, (l, r) => new { X1 = l.X, Y1 = l.Y, X2 = r.X, Y2 = r.Y });
var mouseDrag = from _ in leftMouseDown
from md in mouseDiffs.TakeUntil(leftMouseUp)
select …Run Code Online (Sandbox Code Playgroud) 我想知道是否存在一种方法来获取可观察的流并使用* While运算符,尤其是TakeWhile,SkipWhile和BufferWhile,以便在满足bool“ while”条件时,它们的订阅者不会收到.OnComplete?
当我开始使用.TakeWhile / SkipWhile和BufferWhile运算符时,我假定它们不会终止/ .OnComplete(),而只是在满足布尔条件时才发出/不发出。
举个例子可能更有意义:
我有一个bool标志,指示一个实例是否忙,以及一个可观察的数据流:
private bool IsBusy { get;set; }
private bool IgnoreChanges { get;set; }
private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
Run Code Online (Sandbox Code Playgroud)
..并像这样(简化)使用/设置RX流
private void SetupRx()
{
ConsumerSubscription = Producer
.SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false
.BufferWhile(_ => IsBusy == true) // for all …Run Code Online (Sandbox Code Playgroud) 所以我试图在使用Akavache的应用程序中测试缓存行为.我的测试看起来像这样:
using Akavache;
using Microsoft.Reactive.Testing;
using Moq;
using NUnit.Framework;
using ReactiveUI.Testing;
using System;
using System.Threading.Tasks;
[TestFixture]
public class CacheFixture
{
[Test]
public async Task CachingTest()
{
var scheduler = new TestScheduler();
// replacing the TestScheduler with the scheduler below works
// var scheduler = CurrentThreadScheduler.Instance;
var cache = new InMemoryBlobCache(scheduler);
var someApi = new Mock<ISomeApi>();
someApi.Setup(s => s.GetSomeStrings())
.Returns(Task.FromResult("helloworld")).Verifiable();
var apiWrapper = new SomeApiWrapper(someApi.Object, cache,
TimeSpan.FromSeconds(10));
var string1 = await apiWrapper.GetSomeStrings();
someApi.Verify(s => s.GetSomeStrings(), Times.Once());
StringAssert.AreEqualIgnoringCase("helloworld", string1);
scheduler.AdvanceToMs(5000);
// without …Run Code Online (Sandbox Code Playgroud) system.reactive ×10
c# ×6
.net ×3
reactiveui ×3
akavache ×1
asynchronous ×1
c#-4.0 ×1
dispatcher ×1
linq ×1
mvvm ×1
unit-testing ×1
wpf ×1