标签: rx-java

RxScala:如何保持执行Observable.interval的线程还活着?

我正在尝试编写一个简单的RxScala程序:

import rx.lang.scala.Observable

import scala.concurrent.duration.DurationInt
import scala.language.{implicitConversions, postfixOps}

object Main {
  def main(args: Array[String]): Unit = {
    val o = Observable.interval(1 second)
    o.subscribe(println(_))
  }
}
Run Code Online (Sandbox Code Playgroud)

运行该程序时,看不到任何打印输出。我怀疑这是因为该线程在Observable.interval模具中产生数字。我注意到waitFor(o)RxScalaDemo中有一个对的调用,但是我不知道该从何处导入。

如何使该程序始终运行以打印数字序列?

multithreading scala rx-java rx-scala

1
推荐指数
1
解决办法
431
查看次数

RxJava:在Android MVP中自动取消订阅

我使用RxJava/ Retrofit在Android应用程序与MVP模式.

现在我想清理xxxPresenter什么时候Activity/Fragment为了防止破坏oom.

Presenter简单代码:

public class LoginPresenter {

    private LoginView mLoginView;
    private LoginMode mLoginMode;
    private Subscriber mLoginSubscriber;

    public LoginPresenter(LoginView loginView) {
        this.mLoginView = loginView;
        mLoginMode = new LoginMode();
    }

    void login(String userName, String pwd) {
        mLoginSubscriber = new Subscriber() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {
                if (mLoginView != null) {
                    mLoginView.onLoginSuccess();
                }
            }
        };
        mLoginMode.login(userName, pwd, mLoginSubscriber); …
Run Code Online (Sandbox Code Playgroud)

mvp android rx-java

1
推荐指数
1
解决办法
1405
查看次数

RxJava和RxAndroid - 我需要两者吗?

有一个类似的问题在这里,但它并没有真正回答这个问题.

我一直在我的Android应用程序中使用RxJava和RxAndroid,现在该项目正常运行.

然后我删除了RxJava并且只留下了RxAndroid,项目继续工作.但是,我不太了解RxJava和RxAndroid是否知道从长远来看是否只会丢失RxAndroid.

我读到的关于RxAndroid的一切都是它是RxJava的扩展,但是这是一个真正的扩展,意味着RxJava拥有一些特定的东西(比如AndroidSchedulers)吗?

反正我还需要两个吗?保持两个依赖关系有缺点吗?

谢谢.

android rx-java rx-android

1
推荐指数
1
解决办法
1005
查看次数

如何在微服务架构中处理网络调用

我们正在使用微服务架构,其中顶级服务用于向最终用户公开REST API,后端服务用于查询数据库.

当我们收到1个用户请求时,我们会向后端服务提出~30k的请求.我们使用RxJava进行顶级服务,因此所有30K请求都是并行执行的.我们使用haproxy在后端服务之间分配负载.但是,当我们得到3-5个用户请求时,我们将获得网络连接异常,无路由到主机异常,套接字连接异常.

这种用例的最佳实践是什么?

architecture networking haproxy rx-java microservices

1
推荐指数
1
解决办法
1177
查看次数

如何将List <Single <String >>转换为List <String>?

我有一个列表List<Single<String>>,我想变成一个List<String>.

应保留订单,并且应并行处理所有Single.

List<Single<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  list.add(createSingle(i));
}

// Dummy method which creates a single instance
private Single<String> createSingle(int i) {
  return Single.create(sub -> {
    new Thread(() -> {
      try {
        Thread.sleep(800);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      sub.onSuccess("test_" + i);
    }).start();
  });
}
Run Code Online (Sandbox Code Playgroud)

我使用了我的初始实现,Observable.concatWith但这基本上会阻止并行处理,因为每个单元都是一个接一个地订阅.

我也知道我可以将单曲列表转换成一个发出字符串的Observable,但在这种情况下我放松了顺序.

List<Observable<String>> obsList = list.stream().map(Single::toObservable).collect(Collectors.toList());
Observable.merge(obsList).....
Run Code Online (Sandbox Code Playgroud)

rx-java

1
推荐指数
1
解决办法
957
查看次数

长链中的rxjava StackOverflowError异常

我使用许多运算符构建了很长的rxjava链(带有改装请求):doOnNext,doOnError,switchIfEmpty,onErrorResumeNext,flatMap在某些设备(例如Android 4.1)上,当链沿最长路线走时,它会引发StackOverflowError异常。

是否有一些方法或最佳实践来优化链或防止StackOverflowError?

现在,我只看到一种方法-断开链,并从第一个onComplete(OnNext)调用第二部分,但我认为这不是被动方式。

另一种方法-使用.subscribeOn(Schedulers.newThread());更改线程 操作员。似乎也不是最好的解决方案。

我的代码:1)订阅代码

fastSearch(keyphrase)
    .onErrorResumeNext(throwable -> {
        return correctKeyphraseAndSearch(keyphrase);
    })
    .doOnNext(resultsDao -> {...})
    .subscribe(...)
Run Code Online (Sandbox Code Playgroud)

2)辅助方法

public static Observable<SearchResultsDao> fastSearch(final String keyphrase) {
    String SRD = "true";
    final HttpQueryParams params = new HttpQueryParams();
    //read from cache chain
    Observable<SearchResultsDao> cacheChain = getCache().fastSearch(keyphrase, SRD)
            .doOnNext(...)
            .doOnError(...)
            .onErrorResumeNext(new HandleNoCacheEntry<SearchResultsDao>(params)); //save some data to "params", and return Observable.empty
    //network request chain
    Observable<SearchResultsDao> networkChain = getApi().fastSearch(keyphrase, SRD)
            .retryWhen(new OnNewSessionRequired())
            .doOnNext(new WriteToCacheAction<SearchResultsDao>(params)); //save to cache
    //combine cache+network chains
    return cacheChain
            .switchIfEmpty(networkChain)
            .doOnNext(resultsDao -> …
Run Code Online (Sandbox Code Playgroud)

stack-overflow android rx-java retrofit jack-compiler

1
推荐指数
1
解决办法
1234
查看次数

对于用作表达式的Kotlin函数,是否有简洁的方法来操作并返回值?

在Kotlin中,函数的最终语句可以解释为其返回值.

可以将以下示例的情况简化为更简洁吗?

{ text: String ->
  val validated = validateText(text)
  if (validated) {
    actOnValidation()
  }
  validated
}
Run Code Online (Sandbox Code Playgroud)

我想要这样做的一个具体案例是在使用RxJava的示例中 - 即使有更好的Rx方法,我也对纯Kotlin解决方案感兴趣(如果存在的话).

fun inputChainObservable(targetField: TextView, chainedField: TextView): Observable<Boolean> {
  return targetField.textChanges()
      .observeOn(AndroidSchedulers.mainThread())
      .map { cs: CharSequence? ->
        val hasInput = validateText(cs.toString())
        if (hasInput) {
          chainedField.requestFocus()
        }
        hasInput
      }
}
Run Code Online (Sandbox Code Playgroud)

android kotlin rx-java rx-kotlin rx-java2

1
推荐指数
1
解决办法
94
查看次数

使RxJava 1.1.5适应Reactor Core 3.1.0.M3

我想用采用RxJava 1.1.5与Spring WebFlux(即反应堆堆芯3.1.0.M3)库,但我无法适应ObservableFlux

我以为这是相对简单的,但是我的适配器不起作用:

import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;

public static <T> Flux<T> toFlux(Observable<T> observable) {
    return Flux.create(emitter -> {
        final Subscription subscription = observable.subscribe(new Subscriber<T>() {
            @Override
            public void onNext(T value) {
                emitter.next(value);
            }
            @Override
            public void onCompleted() {
                emitter.complete();
            }
            @Override
            public void onError(Throwable throwable) {
                emitter.error(throwable);
            }
        });
        emitter.onDispose(subscription::unsubscribe);
    });
}
Run Code Online (Sandbox Code Playgroud)

我已经验证过,onNext并且onCompleted都以正确的顺序被调用,但是我Flux始终是空的。有人看到我在做什么错吗?

在相关说明中,为什么反应堆插件中没有RxJava 1适配器?

java reactive-programming rx-java project-reactor

1
推荐指数
1
解决办法
570
查看次数

使用retryWhen()重试observable

我正在实现一个observable,在延迟5s后重试错误.我正在使用改造网络.我面临的问题是,当API返回错误时会有很多重试.我想仅在5秒后重试,但是重试以疯狂的速度发生(几乎是一秒钟的三次).知道为什么吗?

userAPI.getUsers()
       .filter { it.users.isNotEmpty() }
       .subscribeOn(Schedulers.io())
       .retryWhen { errors -> errors.flatMap { errors.delay(5, TimeUnit.SECONDS) } }
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe({}, {})
Run Code Online (Sandbox Code Playgroud)

其中userAPI.getUsers()返回一个可观察的.

疯狂的API请求数量:

08-13 12:31:31.308 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:31.825 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:32.370 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:32.897 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:33.436 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:33.952 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:34.477 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:35.020 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo …
Run Code Online (Sandbox Code Playgroud)

java android observable rx-java rx-java2

1
推荐指数
1
解决办法
244
查看次数

如何在Rxjava2中获得有关背压的实际最新事件?Flowable.onBackpressureLatest()未按预期工作

当生产者生产事件的速度快于客户消费时.

我想用可流动onBackpressureLatest() ,我可以得到最新的情况下发出的.

但事实证明有一个128的默认缓冲区.我得到的是之前缓冲的过时事件.

那么如何才能获得最新的实际活动?

这是示例代码:

Flowable.interval(40, TimeUnit.MILLISECONDS)
            .doOnNext{
                println("doOnNext $it")
            }
            .onBackpressureLatest()
            .observeOn(Schedulers.single())
            .subscribe {
                println("subscribe $it")
                Thread.sleep(100)
            }
Run Code Online (Sandbox Code Playgroud)

我的期望:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   2
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   5
doOnNext    6
doOnNext    7
    subscribe   7
doOnNext    8
doOnNext    9
doOnNext    10
    subscribe   10
...
Run Code Online (Sandbox Code Playgroud)

我得到了什么:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   1
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   2
doOnNext    6
doOnNext    7 …
Run Code Online (Sandbox Code Playgroud)

backpressure rx-java rx-java2

1
推荐指数
1
解决办法
45
查看次数