这是我到目前为止开发的代码:
var observable = Observable.Create<string>(async observer =>
{
var wc = new WebClient { UseDefaultCredentials = true };
observer.OnNext(await wc.DownloadStringTaskAsync("http://ya.ru"));
});
observable.Subscribe(
res => Debug.WriteLine("got result: {0}", res),
exc => Debug.WriteLine("exception: {0}", exc.Message)
);
Run Code Online (Sandbox Code Playgroud)
这正确地获取网站的数据并触发我的回调一次.我想要的是有一个无限循环,其行为如下:等待结果 - >调用OnNext- >等待n秒 - >重复一个操作.
创建一个Observable.Interval和SelectMany我的Observable意志不会做,因为这将在一段固定的时间内查询网站.我希望下一次调用仅在上一次成功或失败后触发.实现这一目标的最优雅,最简洁的方法是什么?
我有以下代码(*),它使用递归调用提供的observable的调度程序实现轮询.
(*)灵感来自https://github.com/ReactiveX/RxJava/issues/448
当我只将onNext事件传递给订阅者时,这正常工作.但是当我将onError事件传递给订阅者时,将调用取消订阅事件,这反过来会导致调度程序被杀死.
我还想将错误传递给订阅者.任何想法如何实现?
public Observable<Status> observe() {
return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS));
}
private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> {
private Subscription subscription;
private Subscription innerSubscription;
private Scheduler.Worker worker = Schedulers.newThread().createWorker();
private Observable<T> observable;
private long delayTime;
private TimeUnit unit;
public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) {
this.observable = observable;
this.delayTime = delayTime;
this.unit = unit;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
subscription = worker.schedule(new Action0() {
@Override
public …Run Code Online (Sandbox Code Playgroud) IScheduler接口提供
public static IDisposable Schedule(this IScheduler scheduler, Action action)
Run Code Online (Sandbox Code Playgroud)
和
public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, System.Threading.Tasks.Task<IDisposable>> action)
Run Code Online (Sandbox Code Playgroud)
ScheduleAsync的方法说明:
// Summary:
// Schedules work using an asynchronous method, allowing for cooperative scheduling
// in an imperative coding style.
//
// Parameters:
// scheduler:
// Scheduler to schedule work on.
//
// action:
// Asynchronous method to run the work, using Yield and Sleep operations for
// cooperative scheduling and injection of cancellation points.
//
// Returns:
// Disposable …Run Code Online (Sandbox Code Playgroud)