相关疑难解决方法(0)

IObservable以无限循环产生结果

这是我到目前为止开发的代码:

var observable = Observable.Create<string>(async observer =>
{
    var wc = new WebClient { UseDefaultCredentials = true };
    observer.OnNext(await wc.DownloadStringTaskAsync("http://ya.ru"));
});

observable.Subscribe(
    res => Debug.WriteLine("got result: {0}", res), 
    exc => Debug.WriteLine("exception: {0}", exc.Message)
); 
Run Code Online (Sandbox Code Playgroud)

这正确地获取网站的数据并触发我的回调一次.我想要的是有一个无限循环,其行为如下:等待结果 - >调用OnNext- >等待n秒 - >重复一个操作.

创建一个Observable.IntervalSelectMany我的Observable意志不会做,因为这将在一段固定的时间内查询网站.我希望下一次调用仅在上一次成功或失败后触发.实现这一目标的最优雅,最简洁的方法是什么?

c# loops system.reactive observable windows-phone-8

4
推荐指数
1
解决办法
1882
查看次数

RxJava:调用onError而不完成/取消订阅

我有以下代码(*),它使用递归调用提供的observable的调度程序实现轮询.

(*)灵感来自https://github.com/ReactiveX/RxJava/issues/448

当我只将onNext事件传递给订阅者时,这正常工作.但是当我将onError事件传递给订阅者时,将调用取消订阅事件,这反过来会导致调度程序被杀死.

我还想将错误传递给订阅者.任何想法如何实现?

public Observable<Status> observe() {
    return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS));
}

private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> {
    private Subscription subscription;
    private Subscription innerSubscription;
    private Scheduler.Worker worker = Schedulers.newThread().createWorker();

    private Observable<T> observable;
    private long delayTime;
    private TimeUnit unit;

    public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) {
        this.observable = observable;
        this.delayTime = delayTime;
        this.unit = unit;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        subscription = worker.schedule(new Action0() {
            @Override
            public …
Run Code Online (Sandbox Code Playgroud)

error-handling rx-java

4
推荐指数
2
解决办法
7818
查看次数

IScheduler.Schedule vs IScheduler.ScheduleAsync?

IScheduler接口提供

public static IDisposable Schedule(this IScheduler scheduler, Action action)
Run Code Online (Sandbox Code Playgroud)

public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, System.Threading.Tasks.Task<IDisposable>> action)
Run Code Online (Sandbox Code Playgroud)

ScheduleAsync的方法说明:

    // Summary:
    //     Schedules work using an asynchronous method, allowing for cooperative scheduling
    //     in an imperative coding style.
    //
    // Parameters:
    //   scheduler:
    //     Scheduler to schedule work on.
    //
    //   action:
    //     Asynchronous method to run the work, using Yield and Sleep operations for
    //     cooperative scheduling and injection of cancellation points.
    //
    // Returns:
    //     Disposable …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

1
推荐指数
1
解决办法
2932
查看次数