标签: system.reactive

如何确定IObservable <T>上有多少/清除订户?

我想知道是否有办法弄清楚有多少观察者订阅了IObservable对象.

我有一个管理HashTable的过滤IObservable实例的类,我想实现一个"健康检查"例程,可以确定订阅者是否已被删除/处置,而每个订阅者都不必明确通知此类他们'完成(即应通过Dispose()或Unsubscribe()隐式).

这并没有真正回答这个问题 -

应该-I-使用-listiobserver或 - 简Actiont就是对保跟踪的-AN-的IObservable

任何想法Rx专家?

c# system.reactive

2
推荐指数
1
解决办法
692
查看次数

Reactive Extensions IObservable在订阅者订阅时通知

我有一个Subject<T>,我希望在订阅它时得到通知.我找不到这个机制.我错过了什么吗?

例如:

    public class AccountManager
    {
        private ReplaySubject<string> _accountEvents = new ReplaySubject<string>();

        public AccountManager()
        {
        }

        public void Add(string val)
        {
            _accountEvents.OnNext(val);
        }

        public IObservable<string> AccountEvents { get { return _accountEvents.AsObservable<string>(); } }

    }
Run Code Online (Sandbox Code Playgroud)

可以AccountManager在任何其他代码调用通知SubscribeAccountEvents属性.

.net system.reactive

2
推荐指数
1
解决办法
332
查看次数

使用Reactive以并行方式运行方法

晕到那里,我一直在寻找RX框架的解决方案.我的C#4.0类将调用2种不同的方法并节省时间,我想并行执行.有没有办法使用Reactive Framework并行运行2种不同的方法?不仅要并行运行这两个方法,还要等待其他方法完成并合并两个结果.示例如下所示:

AccountClass ac = new AccountClass();    
string val1 = ac.Method1();  
bool val2 = ac.Method2();
Run Code Online (Sandbox Code Playgroud)

如何运行这两个方法并行运行并相互等待完成并在Subscription部分中将结果组合在一起?

.net c# parallel-processing system.reactive

2
推荐指数
1
解决办法
1562
查看次数

线程调度程序,可以在同一个驱动器上批量文件操作到同一个线程?

我想使用Rx扩展来处理长文件绑定操作的并行化.

工作流程是这样的:

  • 在多个驱动器上搜索给定的文件模式(假设每个驱动器都在一个单独的物理设备上)
  • 对于找到的每个匹配文件,将长文件操作排队到与同一驱动器上的其他文件相同的线程 - 希望最小化随机搜索.
  • 对不同驱动器上的文件的操作应排队到不同的线程以允许并行处理.

我的问题是:我应该使用什么Rx调度程序(或调度程序的组合)?

.net parallel-processing system.reactive

2
推荐指数
1
解决办法
126
查看次数

Rx .Net TestScheduler-执行立即安排的事件

当我这样做:

testScheduler.Schedule("Hello world",(scheduler, state) => Console.WriteLine(state));
testScheduler.AdvanceTo(testScheduler.Now);
Run Code Online (Sandbox Code Playgroud)

我点击此代码VirtualTimeSchedulerBase:

public void AdvanceTo(TAbsolute time)
{
  int num = this.Comparer.Compare(time, this.Clock);
  if (num < 0)
    throw new ArgumentOutOfRangeException("time");
  if (num == 0)
    return;
Run Code Online (Sandbox Code Playgroud)

num == 0 是的,我退出方法.

我可以打电话testScheduler.Start(),我的行动将会执行.但随后TestScheduler将继续执行其队列中的所有内容.而我希望它在当前时间停止执行操作.

我在TestScheduler上看不到任何其他方法可以让我获得我想要的行为.

这是一个错误,还是正确的行为,但我错过了什么?

编辑:

我误解了.TestScheduler直到它们被安排的日期之后才执行操作.

调度操作会立即将其调度为当前值testScheduler.Now.所以直到它才会被执行Now + 1.

  var testScheduler = new TestScheduler();
  var due = new DateTime();
    testScheduler.Schedule("Hello world", due, (scheduler, s) =>
  {
    Console.WriteLine(s);
    return Disposable.Empty;
  });
  testScheduler.AdvanceTo(due.Ticks);
  // Nothing …
Run Code Online (Sandbox Code Playgroud)

.net c# system.reactive

2
推荐指数
1
解决办法
1542
查看次数

F#Rx扩展具有并发事件的IObservable

我有以下使用FSharp.Reactive的F#代码.函数reactToEvents接受两个事件源并生成一个应用程序状态源.然后UI使用应用程序状态

open FSharp.Reactive

let reactToEvents (initialSettings:Settings) 
            (incomingTweets: ITweet IObservable) 
            (uiActions: UserInput IObservable) :  State IObservable = 
    let initialState = { settings = initialSettings; tweets = [] }

    let tweetInput = Observable.map TweetReceived incomingTweets 
    let userInput = Observable.map UserAction uiActions
    let allInput = Observable.merge userInput tweetInput

    Observable.scan transition initialState allInput
Run Code Online (Sandbox Code Playgroud)

在上面的函数中,在UI线程上生成uiActions时,在一个线程上生成incomingTweets事件.

像我上面那样使用Observable.merge合并两个源是否安全?

使用上面的Observable.scan扫描生成的源是否安全?

如果这不正确,那么正确的方法是什么?

谢谢!

更新1:

我希望至少Observable.merge不关心线程.我发现了这个,似乎说它们使用起来并不安全:

http://msdn.microsoft.com/en-us/library/ee353488.aspx

http://msdn.microsoft.com/en-us/library/ee353749.aspx

"对于每个观察者来说,注册的中间观察对象不是线程安全的.也就是说,源不会在不同的线程上同时触发源的观察."

那么正确的方法是什么呢?

更新2:

这是一个混乱.我链接的文档是用于从名称空间Microsoft.FSharp.Control.Observable合并和扫描的函数.这与我在我的代码中使用的Rx不同.对于真正的Rx,您需要使用库FSharp.Reactive和FSharp.Reactive.Observable下的函数.

f# multithreading reactive-programming system.reactive

2
推荐指数
1
解决办法
404
查看次数

ReactiveCommand的前后行动

我想在命令执行之前设置忙标志和状态栏文本,并在完成后重置标志和文本.我的工作代码在这里:

Cmd = ReactiveCommand.Create();
Cmd.Subscribe(async _ => 
{
    IsBusy = true;
    StatusBarText = "Doing job...";
    try
    {
        var organization = await DoAsyncJob();
        //do smth w results
    }
    finally
    {
        IsBusy = false;
        StatusBarText = "Ready";
});
Run Code Online (Sandbox Code Playgroud)

是否有可能以"正确的方式"做到这一点?像这样:

Cmd = ReactiveCommand.CreateAsyncTask(_ => DoAsyncJob());
//how to do pre-action?
//is exists more beautiful way to to post-action?
Cmd.Subscribe(res =>
{
    try
    {
        //do smth w results
    }
    finally
    {
        IsBusy = false;
        StatusBarText = "Ready";
    }
});
Run Code Online (Sandbox Code Playgroud)

或这个:

Cmd = ReactiveCommand.CreateAsyncTask(_ => DoAsyncJob()); …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive reactiveui

2
推荐指数
1
解决办法
702
查看次数

Observable.Delay或Observable.Buffer重用相同的线程

是否有某些版本的Observable.Delay或Observable.Buffer不为其计时器使用新线程?也许精度较低..

我有一个场景,我需要在一个observable上调用Observable.Delay,每秒产生几千条消息,这会产生很多线程.

谢谢.

c# system.reactive

2
推荐指数
1
解决办法
617
查看次数

Rx - 暂停Observable.Interval

我正在替换rx的Observable.Interval上的计时器,我遇到了一个问题.我不知道如何暂停这样的计时器.我不是指暂停订阅,而是暂停和恢复计时.我知道如何以肮脏的方式做到这一点,但我想知道更好的解决方案.

我目前的代码:

var RemainingTimes = Observable
            .Interval(TimeSpan.FromMilliseconds(refreshInterval))
            .Select(t => _provider.Duration - TimeSpan.FromMilliseconds(t * refreshInterval))
Run Code Online (Sandbox Code Playgroud)

c# system.reactive observable

2
推荐指数
1
解决办法
1295
查看次数

Observable.FromEvent签名

签署这样的签名的目的是什么Observable.FromEvent?对于例如:

var appActivated = Observable.FromEvent(
h => Application.Current.Activated += h,
h => Application.Current.Activated -= h);
Run Code Online (Sandbox Code Playgroud)

特别是什么h?为什么+=,然后-=?我们Observable从事件或事件处理程序制作?如果从事件,为什么不只是有一个签名,如:

var appActivated = Observable.FromEvent(Application.Current.Activated);
Run Code Online (Sandbox Code Playgroud)

c# reactive-programming system.reactive observable

2
推荐指数
1
解决办法
414
查看次数