我想知道是否有办法弄清楚有多少观察者订阅了IObservable对象.
我有一个管理HashTable的过滤IObservable实例的类,我想实现一个"健康检查"例程,可以确定订阅者是否已被删除/处置,而每个订阅者都不必明确通知此类他们'完成(即应通过Dispose()或Unsubscribe()隐式).
这并没有真正回答这个问题 -
应该-I-使用-listiobserver或 - 简Actiont就是对保跟踪的-AN-的IObservable
任何想法Rx专家?
我有一个Subject<T>,我希望在订阅它时得到通知.我找不到这个机制.我错过了什么吗?
例如:
public class AccountManager
{
private ReplaySubject<string> _accountEvents = new ReplaySubject<string>();
public AccountManager()
{
}
public void Add(string val)
{
_accountEvents.OnNext(val);
}
public IObservable<string> AccountEvents { get { return _accountEvents.AsObservable<string>(); } }
}
Run Code Online (Sandbox Code Playgroud)
可以AccountManager在任何其他代码调用通知Subscribe的AccountEvents属性.
晕到那里,我一直在寻找RX框架的解决方案.我的C#4.0类将调用2种不同的方法并节省时间,我想并行执行.有没有办法使用Reactive Framework并行运行2种不同的方法?不仅要并行运行这两个方法,还要等待其他方法完成并合并两个结果.示例如下所示:
AccountClass ac = new AccountClass();
string val1 = ac.Method1();
bool val2 = ac.Method2();
Run Code Online (Sandbox Code Playgroud)
如何运行这两个方法并行运行并相互等待完成并在Subscription部分中将结果组合在一起?
我想使用Rx扩展来处理长文件绑定操作的并行化.
工作流程是这样的:
我的问题是:我应该使用什么Rx调度程序(或调度程序的组合)?
当我这样做:
testScheduler.Schedule("Hello world",(scheduler, state) => Console.WriteLine(state));
testScheduler.AdvanceTo(testScheduler.Now);
Run Code Online (Sandbox Code Playgroud)
我点击此代码VirtualTimeSchedulerBase:
public void AdvanceTo(TAbsolute time)
{
int num = this.Comparer.Compare(time, this.Clock);
if (num < 0)
throw new ArgumentOutOfRangeException("time");
if (num == 0)
return;
Run Code Online (Sandbox Code Playgroud)
num == 0 是的,我退出方法.
我可以打电话testScheduler.Start(),我的行动将会执行.但随后TestScheduler将继续执行其队列中的所有内容.而我希望它在当前时间停止执行操作.
我在TestScheduler上看不到任何其他方法可以让我获得我想要的行为.
这是一个错误,还是正确的行为,但我错过了什么?
编辑:
我误解了.TestScheduler直到它们被安排的日期之后才执行操作.
调度操作会立即将其调度为当前值testScheduler.Now.所以直到它才会被执行Now + 1.
var testScheduler = new TestScheduler();
var due = new DateTime();
testScheduler.Schedule("Hello world", due, (scheduler, s) =>
{
Console.WriteLine(s);
return Disposable.Empty;
});
testScheduler.AdvanceTo(due.Ticks);
// Nothing …Run Code Online (Sandbox Code Playgroud) 我有以下使用FSharp.Reactive的F#代码.函数reactToEvents接受两个事件源并生成一个应用程序状态源.然后UI使用应用程序状态
open FSharp.Reactive
let reactToEvents (initialSettings:Settings)
(incomingTweets: ITweet IObservable)
(uiActions: UserInput IObservable) : State IObservable =
let initialState = { settings = initialSettings; tweets = [] }
let tweetInput = Observable.map TweetReceived incomingTweets
let userInput = Observable.map UserAction uiActions
let allInput = Observable.merge userInput tweetInput
Observable.scan transition initialState allInput
Run Code Online (Sandbox Code Playgroud)
在上面的函数中,在UI线程上生成uiActions时,在一个线程上生成incomingTweets事件.
像我上面那样使用Observable.merge合并两个源是否安全?
使用上面的Observable.scan扫描生成的源是否安全?
如果这不正确,那么正确的方法是什么?
谢谢!
更新1:
我希望至少Observable.merge不关心线程.我发现了这个,似乎说它们使用起来并不安全:
http://msdn.microsoft.com/en-us/library/ee353488.aspx
http://msdn.microsoft.com/en-us/library/ee353749.aspx
"对于每个观察者来说,注册的中间观察对象不是线程安全的.也就是说,源不会在不同的线程上同时触发源的观察."
那么正确的方法是什么呢?
更新2:
这是一个混乱.我链接的文档是用于从名称空间Microsoft.FSharp.Control.Observable合并和扫描的函数.这与我在我的代码中使用的Rx不同.对于真正的Rx,您需要使用库FSharp.Reactive和FSharp.Reactive.Observable下的函数.
我想在命令执行之前设置忙标志和状态栏文本,并在完成后重置标志和文本.我的工作代码在这里:
Cmd = ReactiveCommand.Create();
Cmd.Subscribe(async _ =>
{
IsBusy = true;
StatusBarText = "Doing job...";
try
{
var organization = await DoAsyncJob();
//do smth w results
}
finally
{
IsBusy = false;
StatusBarText = "Ready";
});
Run Code Online (Sandbox Code Playgroud)
是否有可能以"正确的方式"做到这一点?像这样:
Cmd = ReactiveCommand.CreateAsyncTask(_ => DoAsyncJob());
//how to do pre-action?
//is exists more beautiful way to to post-action?
Cmd.Subscribe(res =>
{
try
{
//do smth w results
}
finally
{
IsBusy = false;
StatusBarText = "Ready";
}
});
Run Code Online (Sandbox Code Playgroud)
或这个:
Cmd = ReactiveCommand.CreateAsyncTask(_ => DoAsyncJob()); …Run Code Online (Sandbox Code Playgroud) 是否有某些版本的Observable.Delay或Observable.Buffer不为其计时器使用新线程?也许精度较低..
我有一个场景,我需要在一个observable上调用Observable.Delay,每秒产生几千条消息,这会产生很多线程.
谢谢.
我正在替换rx的Observable.Interval上的计时器,我遇到了一个问题.我不知道如何暂停这样的计时器.我不是指暂停订阅,而是暂停和恢复计时.我知道如何以肮脏的方式做到这一点,但我想知道更好的解决方案.
我目前的代码:
var RemainingTimes = Observable
.Interval(TimeSpan.FromMilliseconds(refreshInterval))
.Select(t => _provider.Duration - TimeSpan.FromMilliseconds(t * refreshInterval))
Run Code Online (Sandbox Code Playgroud) 签署这样的签名的目的是什么Observable.FromEvent?对于例如:
var appActivated = Observable.FromEvent(
h => Application.Current.Activated += h,
h => Application.Current.Activated -= h);
Run Code Online (Sandbox Code Playgroud)
特别是什么h?为什么+=,然后-=?我们Observable从事件或事件处理程序制作?如果从事件,为什么不只是有一个签名,如:
var appActivated = Observable.FromEvent(Application.Current.Activated);
Run Code Online (Sandbox Code Playgroud)