组播Singles,Maybes和Completables的推荐方法是什么?是否只是建议将它们变成标准的Observable来组播它们?
Observable<String> strings =
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
Observable<Integer> lengths = strings.map(String::length);
ConnectableObservable<List<Integer>> lengthsList =
lengths.toList().toObservable().publish();
Run Code Online (Sandbox Code Playgroud)
我有点惊讶,没有ConnectableSingle
,ConnectableMaybe
和ConnectableCompletable
.这背后的原因是什么?开发的努力是不值得的?
在RxJava1中执行以下操作时,onNext中的异常将重新路由到同一订户的onError中:
Observable.from(Arrays.asList("1", "22", "333", "4444")).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.d("RxJava1", "onError: " + e.getCause());
}
@Override
public void onNext(String s) {
if (s.length() == 4) {
Integer test = null;
test.hashCode();
}
Log.d("RxJava1", s + " - " + s.length());
}
});
Run Code Online (Sandbox Code Playgroud)
输出:
D/RxJava1: 1 - 1
D/RxJava1: 22 - 2
D/RxJava1: 333 - 3
D/RxJava1: onError: null
Run Code Online (Sandbox Code Playgroud)
据我所知,在RxJava2中这样做时,此行为已更改,不再返回onError,而只是崩溃:
Observable.fromIterable(Arrays.asList("1", "22", "333", "4444")).subscribeWith(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) …
Run Code Online (Sandbox Code Playgroud) 我有一个PublishSubject
和一个Subscriber
我用来处理(可能)无限的预处理数据流.问题是某些元素可能包含一些错误.我想忽略它们并继续处理.我怎么能这样做?我尝试过这样的事情:
val subject = PublishSubject.create<String>()
subject.retry().subscribe({
println("next: $it")
}, {
println("error")
}, {
println("complete")
})
subject.onNext("foo")
subject.onNext("bar")
subject.onError(RuntimeException())
subject.onNext("wom")
subject.onComplete()
Run Code Online (Sandbox Code Playgroud)
我的问题是没有任何错误处理方法可以帮助我:
onErrorResumeNext()
- 指示Observable在遇到错误时发出一系列项目
onErrorReturn(?)
- 指示Observable在遇到错误时发出特定项目
onExceptionResumeNext(?)
- 指示Observable在遇到异常后继续发出项目(但不是另一种可抛出的项目)
retry(?)
- 如果源Observable发出错误,请重新订阅它,希望它能完成而不会出错
retryWhen(?)
- 如果源Observable发出错误,请将该错误传递给另一个Observable以确定是否重新订阅源
我尝试retry()
了例子,但它无限期地在错误之后挂起我的进程.
我也试过,onErrorResumeNext()
但它没有按预期工作:
val backupSubject = PublishSubject.create<String>()
val subject = PublishSubject.create<String>()
var currentSubject = subject
subject.onErrorResumeNext(backupSubject).subscribe({
println("next: $it")
}, {
println("error")
currentSubject = backupSubject
}, {
println("complete")
})
backupSubject.subscribe({
println("backup")
}, {
println("backup error")
})
currentSubject.onNext("foo")
currentSubject.onNext("bar") …
Run Code Online (Sandbox Code Playgroud) 我的用例是想在我的onNext中的某个条件之后处理.所以尝试使用DisposableObserver.这是有效的代码
Observable.just(1, 2, 3, 4)
.subscribe(new DisposableObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext() received: " + integer);
if (integer == 2) {
dispose();
}
}
@Override
public void onError(Throwable e) { System.out.println("onError()"); }
@Override
public void onComplete() { System.out.println("onComplete()"); }
}
);
Run Code Online (Sandbox Code Playgroud)
现在,如果你尝试用lambda替换它,它会将lambda视为
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
Run Code Online (Sandbox Code Playgroud)
现在这样做.通过从onSubscribe保存一次性用品,然后调用disposable.dispose(); 来自onNext.
private Disposable disposable;
private void disposableObserverTest() {
Observable.just(1, 2, 3, 4)
.subscribe(integer -> {
System.out.println("onNext() received: " + integer);
if (integer == 2) { …
Run Code Online (Sandbox Code Playgroud) AFAIK debounce()
是rxJava 的运营商,用于延迟事件的发射.当我在搜索框中应用它时,它正常工作:
RxTextView.textChangeEvents(editText)
.debounce(1000, TimeUnit.MILLISECONDS) //Only emit after 1 sec
.subscribe(new Observer<TextViewTextChangeEvent>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(TextViewTextChangeEvent event) {
//Get more information about text change event
Log.e(TAG, "Before: " + event.before() + ", start: " + event.start() + ", count: " + event.count());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e(TAG, "textChangeEvents: onComplete");
}
});
Run Code Online (Sandbox Code Playgroud)
但是当我Observable.range()
像这样应用它时:
Observable.range(1, 10000)
.debounce(1000, TimeUnit.MILLISECONDS)
.subscribe(new Observer<Long>() { …
Run Code Online (Sandbox Code Playgroud) 我已经成为Java 8中Java函数编程以及Rx java的忠实粉丝.但是一位同事最近指出,使用这些产品会影响性能.因此决定运行JMH Bench标记,但看起来他是对的.无论我做什么,我都无法获得流版本以提供更好的性能.以下是我的代码
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@OperationsPerInvocation(StreamVsVanilla.N)
public class StreamVsVanilla {
public static final int N = 10000;
static List<Integer> sourceList = new ArrayList<>(N);
static {
for (int i = 0; i < N; i++) {
sourceList.add(i);
}
}
@Benchmark
public List<Double> vanilla() {
List<Double> result = new ArrayList<Double>(sourceList.size() / 2 + 1);
for (Integer i : sourceList) {
if (i % 2 == 0){
result.add(Math.sqrt(i));
}
}
return result;
}
@Benchmark
public List<Double> stream() {
return sourceList.stream().parallel()
.mapToInt(Integer::intValue)
.filter(i …
Run Code Online (Sandbox Code Playgroud) 我收到此随机异常,导致在进行翻新的网络通话时崩溃了我的应用程序,并且正在寻找解决方法的指导
Fatal Exception: java.io.InterruptedIOException: thread interrupted
at okio.Timeout.throwIfReached(Timeout.java:145)
at okio.Okio$1.write(Okio.java:76)
at okio.AsyncTimeout$1.write(AsyncTimeout.java:180)
at okio.RealBufferedSink.flush(RealBufferedSink.java:216)
at okhttp3.internal.http2.Http2Writer.flush(Http2Writer.java:121)
at okhttp3.internal.http2.Http2Connection.newStream(Http2Connection.java:239)
at okhttp3.internal.http2.Http2Connection.newStream(Http2Connection.java:205)
at okhttp3.internal.http2.Http2Codec.writeRequestHeaders(Http2Codec.java:111)
at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:50)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at com.happycorp.happy.happyapp.util.Network$1.intercept(Network.java:80)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at com.happycorp.android.commondata.net.RetrofitFactory$CustomHttpMetricsLogger.intercept(RetrofitFactory.java:139)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:125)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at com.happycorp.android.commondata.net.RetrofitFactory$1.intercept(RetrofitFactory.java:83)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:212)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
at okhttp3.RealCall.execute(RealCall.java:77)
at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41) …
Run Code Online (Sandbox Code Playgroud) 我有一个网络调用列表(他们返回一个Single<Item>
),我想并行执行它们.一旦所有的呼叫成功,我想得到一个List<Item>
,按照与之相同的顺序List<Single<Item>>
.
我已经设法做到并且它有效,但它似乎有点太复杂,我猜我必须有一个更简单的方法,因为这似乎是一个非常常见的用例?
这是我做的:
List<Single<Pair<Item, Integer>>> itemSingles = new ArrayList<>();
for (int index = 0; index < itemCount - 1; index++) {
int finalIndex = index;
itemSingles.add(
fetchItem(...)
.map(item -> new Pair<>(item, finalIndex))
);
}
Single.merge(itemSingles)
.sorted((o1, o2) -> o1.second.compareTo(o2.second))
.map(itemPair -> itemPair.first)
.toList()
.subscribe(items -> { ... });
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,我必须使用它Pair<Item, Integer>
来绑定每个项目的位置,然后我可以使用该位置进行排序.
有更简单的方法吗?
我有以下代码
sealed class AddressUiState
object AddressLoading : AddressUiState()
class AddressLoadedState(val addressResponse: AddressBookResponse) : AddressUiState()
class AddressErrorState(val error: Throwable) : AddressUiState()
Run Code Online (Sandbox Code Playgroud)
我有如下的ViewModel
class AddressViewModel constructor(private val service: SingleProfileService) : ViewModel() {
fun getDisplayableAddressState(id: String): Observable<out AddressUiState> {
return service.getAddresses(id)
.map { AddressLoadedState(it) }
.startWith(AddressLoading)
.onErrorReturn { AddressErrorState(it) }
.subscribeOn(Schedulers.io())
}
}
Run Code Online (Sandbox Code Playgroud)
我看到编译错误和错误类型不匹配的 onErrorReturn 。必填:AddressLoadedState!找到:AddressErrorState 上面的代码有什么问题?
今天我试着解决一个小小的挑战:
您是一家拥有500个办事处的大公司,您想要计算全球收入(每个办事处的收入总和).
每个办公室都提供服务以获得收入.该呼叫需要一定的延迟(网络,数据库访问,......).
显然,您希望尽可能快地获得全球收入.
首先我在python中尝试了非常好的结果:
import asyncio
import time
DELAYS = (475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175)
class Office:
def __init__(self, delay, name, revenue):
self.delay = delay
self.name = name
self.revenue = revenue
async def compute(self):
await asyncio.sleep(self.delay / 1000)
print(f'{self.name} finished in {self.delay}ms')
return self.revenue
async def main(offices, totest):
computed = sum(await asyncio.gather(*[o.compute() for o in offices]))
verdict = ['nok', 'ok'][computed == totest]
print(f'Sum of revenues = {computed} …
Run Code Online (Sandbox Code Playgroud) rx-java2 ×10
rx-java ×7
java ×4
android ×3
kotlin ×2
rx-android ×2
java-8 ×1
java-stream ×1
lambda ×1
okhttp3 ×1
performance ×1
retrofit2 ×1