我有一个“父”对象列表,我想为每个对象获取一个“子”列表。我想要一个父子列表的地图。所以结果是
Map<Parent, List<Child>> result = new HashMap<>();
Run Code Online (Sandbox Code Playgroud)
我的示例父列表:
List<Parent> parents = new ArrayList<>();
parents.add(new Parent(1, "Parent1"));
parents.add(new Parent(2, "Parent2"));
parents.add(new Parent(3, "Parent3"));
parents.add(new Parent(4, "Parent4"));
parents.add(new Parent(5, "Parent5"));
Run Code Online (Sandbox Code Playgroud)
我想迭代它们,一一问孩子
@GET("api/childs/{parentId}")
Observable<Response<List<Child>>> getChilds(@Path("parentId") int parentId);
Run Code Online (Sandbox Code Playgroud)
什么是最好的 RX 结构?
谢谢你,罗伯特
有时我想触发 aRunnable作为我的Observable序列的一部分,但Runnable不报告进度。
我编写了一个简单的工厂,用于将Runnable对象包装成一个Observable:
public static <T> Observable<T> fromRunnable(final Runnable action) {
if (action == null) {
throw new NullPointerException("action");
}
return Observable.fromPublisher(subscriber -> {
try {
action.run();
subscriber.onComplete();
} catch (final Throwable throwable) {
subscriber.onError(throwable);
}
});
}
Run Code Online (Sandbox Code Playgroud)
用法:
Observable.concat(
someTask,
MoreObservables.fromRunnable(() -> {
System.out.println("Done. ");
}));
Run Code Online (Sandbox Code Playgroud)
但是 RxJava 2 已经提供了这个功能吗?
RxJava 查询:
你好,
我有一个PublishSubject<Boolean> subject = PublishSubject.create();
我订阅了上述主题并在此之后进行 API 调用:
subject.observeOn(IOThread)
.flatMap(boolean -> getSomethingFromServer())
.observeOn(MainThread)
.subscribe(something ->
showSomethingOnView(),
error -> showRetryView();
)
Run Code Online (Sandbox Code Playgroud)
当出现类似UnknownHostException,的错误时SocketTimeoutException,我会显示一个重试按钮。单击重试按钮后,我将向PublishSubject().
subject.onNext(boolean Value);
但是在错误出现后,主题正在终止并且没有其他事件被转发。
在快速搜索中,我可以使用Notification<>包装器实现这一点,但还没有找到应用它的好方法。
这是我找到的两个链接:
此链接提到使用Notification.
此链接使用通知,但包装主题的初始类型,即Boolean在通知中。如何将收到的错误和响应包装getSomethingFromServer()到通知中。
我在这里做错了吗?
谢谢
我有以下 RxJava 2 代码(在 Kotlin 中),其中有一个 Observable
disposable = Observable.create<String>({
subscriber ->
try {
Thread.sleep(2000)
subscriber.onNext("Test")
subscriber.onComplete()
} catch (exception: Exception) {
subscriber.onError(exception)
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ result -> Log.d("Test", "Completed $result") },
{ error -> Log.e("Test", "Completed ${error.message}") })
Run Code Online (Sandbox Code Playgroud)
当它静止时Thread.sleep(2000),我执行disposable?.dispose()调用,它会出错
FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: com.elyeproj.rxstate, PID: 10202
java.lang.InterruptedException
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:371)
at java.lang.Thread.sleep(Thread.java:313)
at presenter.MainPresenter$loadData$1.subscribe(MainPresenter.kt:41)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
Run Code Online (Sandbox Code Playgroud)
我希望这dispose将有助于以静默方式取消操作,或者最多Log.e在订阅时捕获错误。但是,它只是按照上面的错误消息崩溃。
为什么异常逃脱了?是不是dispose假设在不崩溃的情况下静默取消整个操作?
我把这个虚拟例子放在一起,试图backpressure更好地理解:
Flowable.range(1, 100).onBackpressureDrop()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableSubscriber<Int>() {
override fun onStart() {
request(1)
}
override fun onComplete() {
Log.d(this@MainActivity::class.java.simpleName, "onComplete")
}
override fun onNext(t: Int?) {
Log.d(this@MainActivity::class.java.simpleName, t.toString())
Thread.sleep(1000)
request(1)
}
override fun onError(t: Throwable?) { //handle error}
})
Run Code Online (Sandbox Code Playgroud)
我有一个非常慢的Subscriber消耗数据的非常快的Flowable. 我正在指示 Flowable 到onBackPressureDrop(). 尽管如此,我的输出看起来像这样(从 1 到 100)
07-16 23:07:21.097 22389-22389 D: 1
07-16 23:07:22.100 22389-22389 D: 2
07-16 23:07:23.102 22389-22389 D: 3
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.104 22389-22389 D: …Run Code Online (Sandbox Code Playgroud) 我正在观察 a 产生的线条NetworkResource,将其包裹在Observable.create. 这是代码,为简单起见,缺少 try/catch 和取消:
fun linesOf(resource: NetworkResource): Observable<String> =
Observable.create { emitter ->
while (!emitter.isDisposed) {
val line = resource.readLine()
Log.i(TAG, "Emitting: $line")
emitter.onNext(line)
}
}
Run Code Online (Sandbox Code Playgroud)
问题是,后来我想将它变成一个Flowable使用observable.toFlowable(LATEST)添加背压的情况下,我的消费跟不上,但根据我怎么做,消费者停止项目128后接收项目。
A)这样一切正常:
val resource = ...
linesOf(resource)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.toFlowable(BackpressureStrategy.LATEST)
.subscribe { Log.i(TAG, "Consuming: $it") }
Run Code Online (Sandbox Code Playgroud)
B)这里消费者在 128 件物品后被卡住(但发射仍在继续):
val resource = ...
linesOf(resource)
.toFlowable(BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Consuming: $it") } // <-- stops after 128
Run Code Online (Sandbox Code Playgroud)
在选项A) 中,一切正常,没有任何问题,我可以看到 …
rxjava2 版本 2.1.5
试图理解 RxJava2 对一个 observable 的多个订阅。有一个简单的文件监视服务,用于跟踪目录中文件的创建、修改、删除。我添加了 2 个订阅者,并希望在两个订阅者上都打印事件。当我将文件复制到监视目录中时,我看到一个订阅者打印出事件。然后,当我删除文件时,我看到第二个订阅者打印出事件。我期待两个订阅者都打印事件。我在这里缺少什么?
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.TimeUnit;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class MyRxJava2DirWatcher {
public Flowable<WatchEvent<?>> createFlowable(WatchService watcher, Path path) {
return Flowable.create(subscriber -> {
boolean error = false;
WatchKey key;
try {
key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
}
catch (IOException e) {
subscriber.onError(e);
error = true;
} …Run Code Online (Sandbox Code Playgroud) 以下代码打印 1、2
Observable.just(1)
.doOnSubscribe(d -> System.out.println(1))
.doOnSubscribe(d -> System.out.println(2))
.blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)
这个打印 2, 1
Observable.just(1)
.doOnSubscribe(d -> System.out.println(1))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(d -> System.out.println(2))
.blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)
在 RxJava1 中,这两个代码都打印“2, 1”,因为doOnSubscribe在下游订阅上游之前调用。
在 RxJava2 中,订阅是从上游到下游 ( Observer.onSubscribe),但doOnSubscribe仍会在订阅之前调用。于是混乱的秩序出现了。
即使我可以给出一个更混乱的情况:
Observable.just(1)
.doOnSubscribe(d -> System.out.println(1))
.doOnSubscribe(d -> System.out.println(2))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(d -> System.out.println(3))
.doOnSubscribe(d -> System.out.println(4))
.blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)
它按照我的预期打印“3, 4, 1, 2”,但不是最预期的。
这是设计使然吗?如果是,有什么好处?
我有两个单曲,我想做的是将它们转换为 Completable
final Single<Boolean> httpRequestOne = createHttpRequestOne();
final Single<Boolean> httpRequestTwo = createHttpRequestTwo();
Run Code Online (Sandbox Code Playgroud)
如果两个单曲都返回 true,则 Completable 的结果应该是 onSuccess 否则它将是 onError。
我也希望它们并行运行,所以我认为 concat 在这里无济于事
final Flowable<Boolean> flowable = httpRequestOne.concatWith(httpRequestTwo);
Run Code Online (Sandbox Code Playgroud) 这看起来有点像一个愚蠢的问题,我已经做了一些测试,看看它在实践中是如何工作的,但我希望看到它得到确认,如果可能的话,我想知道为什么就Observable而言合同及其实施,因为在我对Rx规则的理解中感觉像是一个令人讨厌的漏洞.此外,如果你能告诉我在哪里寻找这个,那么它将帮助我将来自己回答这些问题.
如果我使用以下Observable:
Observable.range(0, 3)
.observeOn(schedulerA)
.flatMap(i -> Observable.just(i)
.observeOn(schedulerB)
.map(j -> -j))
.doOnNext(i -> System.out.println(String.format("Got %d", i)))
.subscribe()
Run Code Online (Sandbox Code Playgroud)
那么操作员是否.doOnNext(i -> System.out.println(String.format("Got %d", i)))会执行schedulerA或者schedulerB是否存在正式或基于规范的原因?
谢谢.