标签: system.reactive

如何利用Microsoft Rx Framework在WinRT/Windows 8中有效地实现Bing Map

在我的电子商务应用程序中,我需要在Bing地图中绘制我附近的商店,而我的另一个要求是在缩放和调整地图时我需要根据地图中心更新我的商店.因此,为了实现这一点,我主要选择传统的编码方式.步骤如下.

  1. 首次启动我将发送api请求位置,并将在地图上绘制商店.

  2. 在Maps ViewChanged事件中,我将根据当前商店的地图发送对附近商店的后续请求.所以在这个实现过程中,我在单个api请求中获得了大约400个商店.我会在地图上绘制这个.但是当我缩放或平移地图时,它会同时发送多个请求并尝试更新UI上的图钉,最终它会阻止UI和Map在我的应用程序中表现得非常糟糕.

在Google搜索期间,我发现了许多关于使用Microsoft Rx框架实现类似功能的建议.但没有得到任何正确的代码示例来实现我的目标.任何人都可以帮助我或指导我解决我的问题.请记住,我需要在一次请求中在地图上平均绘制400个商店.

问候,

Stez.

bing system.reactive windows-8 windows-runtime

28
推荐指数
1
解决办法
439
查看次数

在RxJS中将特定时间量的可观察值分开

在特定时间内产生Observable值的最惯用方法是什么?例如,假设我有一个从大数组创建的Observable,我想每2秒产生一个值.是的组合intervalselectMany最佳的方式是什么?

javascript system.reactive rxjs

28
推荐指数
5
解决办法
1万
查看次数

Rx:我如何立即响应,并限制后续请求

我想建立一个可以立即响应事件的Rx订阅,然后忽略在指定的"冷却"时段内发生的后续事件.

开箱即用的Throttle/Buffer方法仅在超时过后响应,这不是我需要的.

下面是一些设置场景的代码,并使用Throttle(这不是我想要的解决方案):

class Program
{
    static Stopwatch sw = new Stopwatch();

    static void Main(string[] args)
    {
        var subject = new Subject<int>();
        var timeout = TimeSpan.FromMilliseconds(500);

        subject
            .Throttle(timeout)
            .Subscribe(DoStuff);

        var factory = new TaskFactory();

        sw.Start();

        factory.StartNew(() =>
        {
            Console.WriteLine("Batch 1 (no delay)");
            subject.OnNext(1);
        });

        factory.StartNewDelayed(1000, () =>
        {
            Console.WriteLine("Batch 2 (1s delay)");
            subject.OnNext(2);
        });

        factory.StartNewDelayed(1300, () =>
        {
            Console.WriteLine("Batch 3 (1.3s delay)");
            subject.OnNext(3);
        });

        factory.StartNewDelayed(1600, () =>
        {
            Console.WriteLine("Batch 4 (1.6s delay)");
            subject.OnNext(4);
        });

        Console.ReadKey();
        sw.Stop();
    }

    private static void …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

27
推荐指数
3
解决办法
5572
查看次数

在面向.NET 4+的库中公开通知时,IObservable是否应优先于事件

我有一个.NET库,作为对象模型的一部分将发出某些事件的通知.

在我看来,事件的主要优点是初学者的可接近性(以及某些消费环境中的简单性),主要的负面因素是它们不可组合,因此如果你想做任何有趣的事情而不写代码丛林.Observable.FromEvent

正在解决的问题的本质是事件流量不会特别频繁或大量(它肯定不会尖叫RX),但绝对没有要求支持4.0之前的.NET版本[因此我可以使用内置IObservable接口,System.Reactive不会对消费者造成任何重大依赖].我感兴趣的是一些一般原则一些特定的具体原因,更喜欢IObservablesevent从API设计的角度看小号虽然-无论在哪里,我的具体情况可能在坐event- IObservable频谱.

那么,问题是:

如果我选择最简单的东西并暴露一个event而不是一个IObservable

或者,重申:除了有做一个消费者Observable.FromEvent*能够撰写事件,是真的不是一个单一的理由,更喜欢一个IObservable比一个event的API中暴露的通知是什么时候?

IObservable用于非尖叫-RT内容或编码指南的项目的引用将是理想的但并不重要.


在@Adam Houldsworth的评论中提到的NB,我对.NET 4+库的API表面的具体内容感兴趣,而不是对我们这个时代代表更好的"默认架构"的意见调查: )

注意,在C#中的IObserver和IObservable中已经触及了这个问题,对于Observer vs Delegates,事件IObservable与普通事件或为什么我应该使用IObservable?.由于SRP违规,我提出的问题的方面没有在任何答复中得到解决.另一个略微重叠的问题是.NET Rx优于经典事件?.(使用IObservable而不是事件)[ 使用IObservable而不是事件属于同一类别.

.net c# events system.reactive

27
推荐指数
2
解决办法
2420
查看次数

如何处理RxJava中观察者的onNext引发的异常?

请考虑以下示例:

Observable.range(1, 10).subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

这将输出1到5之间的数字,然后打印异常.

我想要实现的是让观察者保持订阅并在抛出异常后继续运行,即打印从1到10的所有数字.

我已经尝试过使用retry()其他各种错误处理操作符,但是,如文档中所述,它们的目的是处理observable本身发出的错误.

最简单的解决方案就是将整个主体包装onNext成一个try-catch块,但这对我来说听起来不是一个好方法.在类似的Rx.NET问题中,建议的解决方案是创建一个扩展方法,通过创建代理可观察来进行包装.我试图重拍它:

Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
        origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

这不会改变任何东西,因为RxJava本身将订阅者包装成一个SafeSubscriber.使用unsafeSubscribe它来解决它似乎也不是一个好的解决方案.

我该怎么做才能解决这个问题?

java reactive-programming system.reactive rx-java

27
推荐指数
1
解决办法
6656
查看次数

各种ISubject实现做了什么以及何时使用它们?

我对Subject类的功能以及何时使用它有一个相当好的想法,但我一直在查看msdn上的语言参考,并看到有各种其他ISubject实现,例如:

  • AsyncSubject
  • BehaviorSubject
  • ReplaySubject

由于文档在实地非常薄弱,这些类型的重点是什么,在什么情况下你会使用它们?

system.reactive

26
推荐指数
3
解决办法
1921
查看次数

使用Reactive Extensions进行调试的技巧?

我正在寻找有关如何使RX更容易调试的想法.当源通过组合器和节流阀以及发布等时,找到失败点可能非常困难.

到目前为止,我一直在做与复杂的Enumerable链类似的事情 - 插入Do()进行跟踪,将一个"name"字段添加到匿名类型中,有时会抓住堆栈跟踪.但是我们可能有数百个我们系统中的生产者和成千上万的消费者,并且很难分离问题.

你有什么样的技巧来调试你的RX使用?

system.reactive

26
推荐指数
2
解决办法
2495
查看次数

SynchronizationContext和TaskScheduler之间的概念区别是什么

Stephen Toub在博客写道

SynchronizationContext和TaskScheduler都是表示"调度程序"的抽象,是您提供一些工作的东西,它决定了运行该工作的时间和地点.有许多不同形式的调度程序.例如,ThreadPool是一个调度程序:您调用ThreadPool.QueueUserWorkItem来提供一个委托来运行,该委托被排队,并且其中一个ThreadPool的线程最终选择并运行该委托.您的用户界面还有一个调度程序:消息泵.

因此System.Reactive.Concurrency.EventLoopScheduler,Reactive Extensions的Dispatcher,ThreadPool,TaskScheduler,SyncrhonizationContextIScheduler实现在这个意义上都是"调度程序".

他们之间有什么区别?

他们为什么都是必要的?我想我得到EventLoop,Dispatcher,ThreadPool.IScheduler也有很好的解释.
但是TaskScheduler和SyncrhonizationContext仍然不清楚.

Stephen Cleary的优秀文章解释了SyncrhonizationContext,我想我明白了.那么为什么我们需要TaskScheduler,目前尚不清楚.

请解释或指向消息来源.

c# multithreading conceptual task-parallel-library system.reactive

26
推荐指数
3
解决办法
4865
查看次数

如何在不使用Subject <T>支持字段的情况下公开IObservable <T>属性

这个关于关于Subject<T>Enigmativity 的问题的答案中提到:

另外,你应该尽量避免使用主题.一般规则是,如果你正在使用一个主题,那么你做错了什么.

我经常使用主题作为IObservable属性的支持字段,这可能是在Rx之前的几天中的.NET事件.例如,而不是像

public class Thing
{
    public event EventHandler SomethingHappened;

    private void DoSomething()
    {
        Blah();
        SomethingHappened(this, EventArgs.Empty);
    }
}
Run Code Online (Sandbox Code Playgroud)

我可能会这样做

public class Thing
{
    private readonly Subject<Unit> somethingHappened = new Subject<Unit>();
    public IObservable<Unit> SomethingHappened
    {
        get { return somethingHappened; }
    }

    private void DoSomething()
    {
        Blah();
        somethingHappened.OnNext(Unit.Default);
    }
}
Run Code Online (Sandbox Code Playgroud)

那么,如果我想避免使用Subject什么是正确的方式来做这种事情?或者我应该坚持在我的界面中使用.NET事件,即使它们被Rx代码使用(很可能FromEventPattern)?

此外,更多详细信息,为什么使用Subject这样一个坏主意将是有帮助的.

更新:为了使这个问题更加具体,我正在讨论使用Subject<T>从非Rx代码(可能是你正在使用其他一些遗留代码)到Rx世界的方法.所以,像:

class MyVolumeCallback : LegacyApiForSomeHardware
{
    private readonly Subject<int> volumeChanged …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

26
推荐指数
2
解决办法
3562
查看次数

RxJava和观察者代码的并行执行

我使用RxJava Observable api获得以下代码:

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
    observable
      .buffer(10000)
      .observeOn(Schedulers.computation())
      .subscribe(recordInfo -> {
        _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
          for(Info info : recordInfo) {
            // some I/O operation logic
         }
      }, 
      exception -> {
      }, 
      () -> {
      });
Run Code Online (Sandbox Code Playgroud)

我的期望是,在指定了计算调度程序之后,观察代码即subscribe()方法中的代码将并行执行.相反,代码仍然在单个线程上顺序执行.如何使用RxJava api使代码并行运行.

java system.reactive rx-java

26
推荐指数
2
解决办法
2万
查看次数