它们之间有什么相似之处和不同之处,看起来Java Parallel Stream有一些RXJava中可用的元素,是吗?
我在Android应用程序上使用Retrofit + RxJava,并且问自己如何处理链接调用的API分页,直到检索到所有数据.是这样的:
Observable<ApiResponse> getResults(@Query("page") int page);
Run Code Online (Sandbox Code Playgroud)
所述ApiResponse对象具有简单的结构:
class ApiResponse {
int current;
Integer next;
List<ResponseObject> results;
}
Run Code Online (Sandbox Code Playgroud)
API将返回下一个值,直到最后一页为止.
有一些很好的方法来实现这一目标吗?试图结合一些flatMaps(),但没有成功.
可观察和可流动的界面似乎是相同的.为什么Flowable是在RxJava 2.0中引入的?我什么时候应该使用Flowable over Observable?
是否可以使用RxJava/RxAndroid和Retrofit执行定期的http请求,每隔x秒更新一次数据?
目前我正在使用一个每x秒触发的IntentService和Recursive Handler/Runnable.我想知道我是否可以删除所有这些并让RxJava处理请求.
final RestClient client = new RestClient();
final ApiService service = client.getApiService();
public interface ApiService {
@GET("/athletes")
public Observable<List<Athlete>> getAthletes();
}
service.getAthletes()
.retry(3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<Athlete>>() {
@Override
public void call(List<Athlete> athletes) {
// Handle Success
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// Handle Error
}
});
Run Code Online (Sandbox Code Playgroud)
编辑
完成所有操作后,我最终得到了以下代码.欢迎任何更新.
final Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
obs = Observable.interval(30, TimeUnit.SECONDS, scheduler)
.flatMap(tick -> service.getAthletes(1, 0l))
// Performed on service.getAthletes() observable
.subscribeOn(scheduler)
.observeOn(AndroidSchedulers.mainThread())
.doOnError(err -> …Run Code Online (Sandbox Code Playgroud) 无法弄清楚为什么会这样.我的调用不会触发任何一个rx回调(onCompleted(),onError(),onNext()).我收到的唯一的东西是这个okhttp输出:
D/OkHttp: --> GET https://api.privatbank.ua/p24api/exchange_rates?json=true&date=20.11.2016 http/1.1
D/OkHttp: --> END GET
D/OkHttp: <-- HTTP FAILED: java.io.IOException: Canceled
Run Code Online (Sandbox Code Playgroud)
改造模块:
@Module
public class RestModule {
@Provides
@Singleton
public HttpLoggingInterceptor providesHttpLogginInterceptor() {
return new HttpLoggingInterceptor().setLevel(HttpLoggingInterceptor.Level.BODY);
}
@Provides
@Singleton
public OkHttpClient providesOkHttpClient(@NonNull HttpLoggingInterceptor loggingInterceptor) {
return new OkHttpClient.Builder()
.addInterceptor(loggingInterceptor)
.connectTimeout(ConstantsManager.CONNECTION_TIME_OUT, TimeUnit.SECONDS)
.readTimeout(ConstantsManager.READ_TIME_OUT, TimeUnit.SECONDS)
.build();
}
@Provides
@Singleton
public Gson providesGson() {
return new GsonBuilder().create();
}
@Provides
@Singleton
public Retrofit providesRetrofit(@NonNull OkHttpClient okHttpClient, @NonNull Gson gson) {
return new Retrofit.Builder()
.baseUrl(ConstantsManager.BASE_URL)
.client(okHttpClient)
.addConverterFactory(SimpleXmlConverterFactory.create())
.addConverterFactory(GsonConverterFactory.create(gson))
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build(); …Run Code Online (Sandbox Code Playgroud) 我在调用其余的api时遇到上述错误.我正在使用retrofit2和RxJava.
public class ServiceFactory {
public static <T> T createRetrofitService(final Class<T> clazz, final String endpoint){
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(endpoint)
//.addConverterFactory(GsonConverterFactory.create())
.build();
T service = retrofit.create(clazz);
return service;
}
Run Code Online (Sandbox Code Playgroud)
}
public interface MovieService{
//public final String API_KEY = "<apikey>";
public final String SERVICE_END = "https://api.mymovies.org/3/";
@GET("movie/{movieId}??api_key=xyz")
Observable<Response<Movies>> getMovies(@Field("movieId") int movieId);
Run Code Online (Sandbox Code Playgroud)
}
MovieService tmdbService = ServiceFactory.createRetrofitService(MovieService.class, MovieService.SERVICE_END);
Observable<Response<Movies>> responseObservable = tmdbService.getMovies(400);
responseObservable .subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Response<Movies>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable …Run Code Online (Sandbox Code Playgroud) 我正在尝试升级到Retrofit 2.0并在我的android项目中添加RxJava.我正在进行api调用,并想要将sqlite中的响应数据作为缓存来检索url和它
Observable<MyResponseObject> apiCall(@Body body);
Run Code Online (Sandbox Code Playgroud)
并在RxJava调用中:
myRetrofitObject.apiCall(body).subscribe(new Subscriber<MyResponseObject>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(MyResponseObject myResponseObject) {
}
});
Run Code Online (Sandbox Code Playgroud)
在Retrofit 1.9中,我们可以获得成功回调中的url:
@Override
public void success(MyResponseObject object, Response response) {
String url=response.getUrl();
//save object data and url to sqlite
}
Run Code Online (Sandbox Code Playgroud)
如何使用RxJava进行Retrofit 2.0?
我有Observable流,我想将它转换为Completable,我怎么能这样做?
我在我的android应用程序上使用rxjava和rxvolley.当我尝试运行它时,我收到此错误
Execution failed for task ':testapp:transformResourcesWithMergeJavaResForDebug'.
> com.android.build.api.transform.TransformException: com.android.builder.packaging.DuplicateFileException: Duplicate files copied in APK META-INF/rxjava.properties
File1: C:\Users\Daniel\.gradle\caches\modules-2\files-2.1\io.reactivex\rxjava\1.1.0\748f0546d5c3c27f1aef07270ffea0c45f0c42a4\rxjava-1.1.0.jar
File2: C:\Users\Daniel\.gradle\caches\modules-2\files-2.1\io.reactivex.rxjava2\rxjava\2.0.3\d2f725668bd22e21170381b23f8fbdf72c69d886\rxjava-2.0.3.jar
Run Code Online (Sandbox Code Playgroud)
我有这样的exclude.gradle文件
android {
packagingOptions {
exclude 'META-INF/DEPENDENCIES.txt'
exclude 'META-INF/NOTICE'
exclude 'META-INF/NOTICE.txt'
exclude 'META-INF/LICENSE'
exclude 'META-INF/LICENSE.txt'
exclude 'META-INF/rxjava.properties'
exclude 'META-INF/rxjava.PROPERTIES'
exclude 'META-INF/RXJAVA.properties'
exclude 'META-INF/RXJAVA.PROPERTIES'
exclude 'META-INF/rxjava'
exclude 'META-INF/RXJAVA'
}
lintOptions {
abortOnError false
}
}
Run Code Online (Sandbox Code Playgroud)
我该如何解决这个问题?
我在这里有点疯狂.我正在尝试创建一个Observable<BigDecimal>扩展函数(针对RxJava 2.x)来发出平均排放量,但是我得到了Single.zip()函数的编译错误.有没有人有任何想法我做错了什么?我试图明确我的所有类型,但是没有用......
import io.reactivex.Observable
import io.reactivex.Single
import java.math.BigDecimal
fun Observable<BigDecimal>.sum() = reduce { total, next -> total + next }
//compile error
fun Observable<BigDecimal>.average() = publish().autoConnect(2).let {
Single.zip(it.sum().toSingle(), it.count()) {
sum, count -> sum / BigDecimal.valueOf(count)
}
}
Run Code Online (Sandbox Code Playgroud)
rx-java ×10
android ×6
java ×3
retrofit2 ×3
rx-android ×3
retrofit ×2
java-stream ×1
kotlin ×1
observable ×1
okhttp3 ×1
pagination ×1
rx-java2 ×1