我正在尝试编写一个简单的RxScala程序:
import rx.lang.scala.Observable
import scala.concurrent.duration.DurationInt
import scala.language.{implicitConversions, postfixOps}
object Main {
def main(args: Array[String]): Unit = {
val o = Observable.interval(1 second)
o.subscribe(println(_))
}
}
Run Code Online (Sandbox Code Playgroud)
运行该程序时,看不到任何打印输出。我怀疑这是因为该线程在Observable.interval模具中产生数字。我注意到waitFor(o)在RxScalaDemo中有一个对的调用,但是我不知道该从何处导入。
如何使该程序始终运行以打印数字序列?
我使用RxJava/ Retrofit在Android应用程序与MVP模式.
现在我想清理xxxPresenter什么时候Activity/Fragment为了防止破坏oom.
Presenter简单代码:
public class LoginPresenter {
private LoginView mLoginView;
private LoginMode mLoginMode;
private Subscriber mLoginSubscriber;
public LoginPresenter(LoginView loginView) {
this.mLoginView = loginView;
mLoginMode = new LoginMode();
}
void login(String userName, String pwd) {
mLoginSubscriber = new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
if (mLoginView != null) {
mLoginView.onLoginSuccess();
}
}
};
mLoginMode.login(userName, pwd, mLoginSubscriber); …Run Code Online (Sandbox Code Playgroud) 有一个类似的问题在这里,但它并没有真正回答这个问题.
我一直在我的Android应用程序中使用RxJava和RxAndroid,现在该项目正常运行.
然后我删除了RxJava并且只留下了RxAndroid,项目继续工作.但是,我不太了解RxJava和RxAndroid是否知道从长远来看是否只会丢失RxAndroid.
我读到的关于RxAndroid的一切都是它是RxJava的扩展,但是这是一个真正的扩展,意味着RxJava拥有一些特定的东西(比如AndroidSchedulers)吗?
反正我还需要两个吗?保持两个依赖关系有缺点吗?
谢谢.
我们正在使用微服务架构,其中顶级服务用于向最终用户公开REST API,后端服务用于查询数据库.
当我们收到1个用户请求时,我们会向后端服务提出~30k的请求.我们使用RxJava进行顶级服务,因此所有30K请求都是并行执行的.我们使用haproxy在后端服务之间分配负载.但是,当我们得到3-5个用户请求时,我们将获得网络连接异常,无路由到主机异常,套接字连接异常.
这种用例的最佳实践是什么?
我有一个列表List<Single<String>>,我想变成一个List<String>.
应保留订单,并且应并行处理所有Single.
List<Single<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(createSingle(i));
}
// Dummy method which creates a single instance
private Single<String> createSingle(int i) {
return Single.create(sub -> {
new Thread(() -> {
try {
Thread.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
sub.onSuccess("test_" + i);
}).start();
});
}
Run Code Online (Sandbox Code Playgroud)
我使用了我的初始实现,Observable.concatWith但这基本上会阻止并行处理,因为每个单元都是一个接一个地订阅.
我也知道我可以将单曲列表转换成一个发出字符串的Observable,但在这种情况下我放松了顺序.
List<Observable<String>> obsList = list.stream().map(Single::toObservable).collect(Collectors.toList());
Observable.merge(obsList).....
Run Code Online (Sandbox Code Playgroud) 我使用许多运算符构建了很长的rxjava链(带有改装请求):doOnNext,doOnError,switchIfEmpty,onErrorResumeNext,flatMap在某些设备(例如Android 4.1)上,当链沿最长路线走时,它会引发StackOverflowError异常。
是否有一些方法或最佳实践来优化链或防止StackOverflowError?
现在,我只看到一种方法-断开链,并从第一个onComplete(OnNext)调用第二部分,但我认为这不是被动方式。
另一种方法-使用.subscribeOn(Schedulers.newThread());更改线程 操作员。似乎也不是最好的解决方案。
我的代码:1)订阅代码
fastSearch(keyphrase)
.onErrorResumeNext(throwable -> {
return correctKeyphraseAndSearch(keyphrase);
})
.doOnNext(resultsDao -> {...})
.subscribe(...)
Run Code Online (Sandbox Code Playgroud)
2)辅助方法
public static Observable<SearchResultsDao> fastSearch(final String keyphrase) {
String SRD = "true";
final HttpQueryParams params = new HttpQueryParams();
//read from cache chain
Observable<SearchResultsDao> cacheChain = getCache().fastSearch(keyphrase, SRD)
.doOnNext(...)
.doOnError(...)
.onErrorResumeNext(new HandleNoCacheEntry<SearchResultsDao>(params)); //save some data to "params", and return Observable.empty
//network request chain
Observable<SearchResultsDao> networkChain = getApi().fastSearch(keyphrase, SRD)
.retryWhen(new OnNewSessionRequired())
.doOnNext(new WriteToCacheAction<SearchResultsDao>(params)); //save to cache
//combine cache+network chains
return cacheChain
.switchIfEmpty(networkChain)
.doOnNext(resultsDao -> …Run Code Online (Sandbox Code Playgroud) 在Kotlin中,函数的最终语句可以解释为其返回值.
可以将以下示例的情况简化为更简洁吗?
{ text: String ->
val validated = validateText(text)
if (validated) {
actOnValidation()
}
validated
}
Run Code Online (Sandbox Code Playgroud)
我想要这样做的一个具体案例是在使用RxJava的示例中 - 即使有更好的Rx方法,我也对纯Kotlin解决方案感兴趣(如果存在的话).
fun inputChainObservable(targetField: TextView, chainedField: TextView): Observable<Boolean> {
return targetField.textChanges()
.observeOn(AndroidSchedulers.mainThread())
.map { cs: CharSequence? ->
val hasInput = validateText(cs.toString())
if (hasInput) {
chainedField.requestFocus()
}
hasInput
}
}
Run Code Online (Sandbox Code Playgroud) 我想用采用RxJava 1.1.5与Spring WebFlux(即反应堆堆芯3.1.0.M3)库,但我无法适应Observable到Flux。
我以为这是相对简单的,但是我的适配器不起作用:
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
public static <T> Flux<T> toFlux(Observable<T> observable) {
return Flux.create(emitter -> {
final Subscription subscription = observable.subscribe(new Subscriber<T>() {
@Override
public void onNext(T value) {
emitter.next(value);
}
@Override
public void onCompleted() {
emitter.complete();
}
@Override
public void onError(Throwable throwable) {
emitter.error(throwable);
}
});
emitter.onDispose(subscription::unsubscribe);
});
}
Run Code Online (Sandbox Code Playgroud)
我已经验证过,onNext并且onCompleted都以正确的顺序被调用,但是我Flux始终是空的。有人看到我在做什么错吗?
在相关说明中,为什么反应堆插件中没有RxJava 1适配器?
我正在实现一个observable,在延迟5s后重试错误.我正在使用改造网络.我面临的问题是,当API返回错误时会有很多重试.我想仅在5秒后重试,但是重试以疯狂的速度发生(几乎是一秒钟的三次).知道为什么吗?
userAPI.getUsers()
.filter { it.users.isNotEmpty() }
.subscribeOn(Schedulers.io())
.retryWhen { errors -> errors.flatMap { errors.delay(5, TimeUnit.SECONDS) } }
.observeOn(AndroidSchedulers.mainThread())
.subscribe({}, {})
Run Code Online (Sandbox Code Playgroud)
其中userAPI.getUsers()返回一个可观察的.
疯狂的API请求数量:
08-13 12:31:31.308 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:31.825 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:32.370 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:32.897 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:33.436 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:33.952 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:34.477 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:35.020 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo …Run Code Online (Sandbox Code Playgroud) 当生产者生产事件的速度快于客户消费时.
我想用可流动与onBackpressureLatest() ,我可以得到最新的情况下发出的.
但事实证明有一个128的默认缓冲区.我得到的是之前缓冲的过时事件.
那么如何才能获得最新的实际活动?
这是示例代码:
Flowable.interval(40, TimeUnit.MILLISECONDS)
.doOnNext{
println("doOnNext $it")
}
.onBackpressureLatest()
.observeOn(Schedulers.single())
.subscribe {
println("subscribe $it")
Thread.sleep(100)
}
Run Code Online (Sandbox Code Playgroud)
我的期望:
doOnNext 0
subscribe 0
doOnNext 1
doOnNext 2
subscribe 2
doOnNext 3
doOnNext 4
doOnNext 5
subscribe 5
doOnNext 6
doOnNext 7
subscribe 7
doOnNext 8
doOnNext 9
doOnNext 10
subscribe 10
...
Run Code Online (Sandbox Code Playgroud)
我得到了什么:
doOnNext 0
subscribe 0
doOnNext 1
doOnNext 2
subscribe 1
doOnNext 3
doOnNext 4
doOnNext 5
subscribe 2
doOnNext 6
doOnNext 7 …Run Code Online (Sandbox Code Playgroud) rx-java ×10
android ×5
rx-java2 ×3
java ×2
architecture ×1
backpressure ×1
haproxy ×1
kotlin ×1
mvp ×1
networking ×1
observable ×1
retrofit ×1
rx-android ×1
rx-kotlin ×1
rx-scala ×1
scala ×1