标签: rx-android

rxJava调度程序用例

在RxJava中有5种不同的调度程序可供选择:

  1. immediate():创建并返回一个在当前线程上立即执行工作的Scheduler.

  2. trampoline():创建并返回一个调度程序,该调度程序对当前工作完成后要执行的当前线程进行排队.

  3. newThread():创建并返回一个Scheduler,为每个工作单元创建一个新的Thread.

  4. computation():创建并返回用于计算工作的Scheduler.这可以用于事件循环,处理回调和其他计算工作.不要在此调度程序上执行IO绑定的工作.使用调度程序.io()代替.

  5. io():创建并返回一个用于IO绑定工作的Scheduler.该实现由Executor线程池支持,该线程池将根据需要增长.这可用于异步执行阻塞IO.不要在此调度程序上执行计算工作.使用调度程序.计算()而不是.

问题:

前3个调度程序非常自我解释; 但是,我对计算io有点困惑.

  1. 究竟什么是"IO限制工作"?它用于处理streams(java.io)和files(java.nio.files)吗?它用于数据库查询吗?它是用于下载文件还是访问REST API?
  2. 如何计算()从不同newThread() ?是每次所有的calculate()调用都在单个(后台)线程而不是新的(后台)线程上吗?
  3. 为什么在进行IO工作时调用calculate()会很糟糕?
  4. 为什么在进行计算工作时调用io()会很糟糕?

java multithreading thread-safety rx-java rx-android

245
推荐指数
2
解决办法
2万
查看次数

何时在Android中使用RxJava以及何时使用Android Architectural Components中的LiveData?

我没有理由在Android中使用RxJava,在Android Architectural Components中使用LiveData.如果两者之间的用例和差异以及代码形式的示例示例进行解释,这将解释两者之间的差异,这将非常有用.

android rx-android reactive rx-java2 android-architecture-components

158
推荐指数
6
解决办法
4万
查看次数

使用Retrofit 2.0和RxJava获取响应状态代码

我正在尝试升级到Retrofit 2.0并在我的android项目中添加RxJava.我正在进行api调用,并希望在服务器发出错误响应的情况下检索错误代码.

Observable<MyResponseObject> apiCall(@Body body);
Run Code Online (Sandbox Code Playgroud)

并在RxJava调用中:

myRetrofitObject.apiCall(body).subscribe(new Subscriber<MyResponseObject>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(MyResponseObject myResponseObject) {
           //On response from server
        }
    });
Run Code Online (Sandbox Code Playgroud)

在Retrofit 1.9中,RetrofitError仍然存在,我们可以通过以下方式获得状态:

error.getResponse().getStatus()
Run Code Online (Sandbox Code Playgroud)

如何使用RxJava进行Retrofit 2.0?

android android-networking rx-java retrofit rx-android

101
推荐指数
3
解决办法
5万
查看次数

无法为io.reactivex.Observable创建调用适配器

我将向我的服务器(它是Rails应用程序)发送一个简单的get方法,并使用RxJava和Retrofit获取结果.我做的是:

我的界面:

public interface ApiCall {
    String SERVICE_ENDPOINT = "https://198.50.214.15";
    @GET("/api/post")
    io.reactivex.Observable<Post> getPost();
}
Run Code Online (Sandbox Code Playgroud)

我的模型是这样的:

public class Post
{
    @SerializedName("id")
    private String id;
    @SerializedName("body")
    private String body;
    @SerializedName("title")
    private String title;

    public String getId ()
    {
        return id;
    }


    public String getBody ()
    {
        return body;
    }


    public String getTitle ()
    {
        return title;
    }

}
Run Code Online (Sandbox Code Playgroud)

这就是我在活动中所做的:

public class Javax extends AppCompatActivity {
    RecyclerView rvListContainer;
    postAdapter postAdapter;
    List<String> messageList=new ArrayList<>();
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_javax);

        rvListContainer=(RecyclerView)findViewById(R.id.recyclerView);
        postAdapter=new …
Run Code Online (Sandbox Code Playgroud)

rx-java retrofit rx-android retrofit2 rx-java2

85
推荐指数
4
解决办法
3万
查看次数

使用Rxjava Schedulers.newThread()和Schedulers.io()进行改造

在网络请求中使用Schedulers.newThread()vs 有什么好处.我见过许多使用的例子,但我想了解原因.Schedulers.io()Retrofitio()

示例情况:

observable.onErrorResumeNext(refreshTokenAndRetry(observable))
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())...
Run Code Online (Sandbox Code Playgroud)

VS

observable.onErrorResumeNext(refreshTokenAndRetry(observable))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())...
Run Code Online (Sandbox Code Playgroud)

我见过的原因之一是 -

newThread()为每个工作单元创建一个新线程.io()将使用线程池

但是这个论点对应用程序的影响是什么?还有什么其他方面?

android rx-java retrofit rx-android

83
推荐指数
1
解决办法
1万
查看次数

合并一个Observable列表并等待所有完成

TL; DR 如何转换Task.whenAll(List<Task>)RxJava

我现有的代码使用Bolts构建异步任务列表,并等待所有这些任务完成后再执行其他步骤.从本质上讲,它构建一个List<Task>并返回一个单独的Task,当列表中的所有任务完成时,按照Bolts站点上的示例标记为已完成.

我期待,以取代BoltsRxJava和我假设建立的异步任务列表(在事先不知道大小)和包装他们都到一个单一的这种方法Observable是可行的,但我不知道怎么办.

我试着看merge,zip,concat等...但不能去上工作List<Observable>,我会被建立,因为他们似乎都面向工作的只有两个Observables,如果我理解正确的文档在一个时间.

我正在努力学习RxJava并且仍然是新手,所以请原谅我,如果这是一个明显的问题,或者在某个地方的文档中解释过; 我试过搜索.任何帮助将非常感激.

java reactive-programming rx-java rx-android

76
推荐指数
3
解决办法
6万
查看次数

如何使用RxJava 2的CompositeDisposable?

在RxJava 1中,有CompositeSubscription,但在RxJava2中不存在,rxJava2中有一些CompositeDisposable.如何在RxJava2中使用CompositeDisposable或Disposable?

java android rx-java rx-android rx-java2

71
推荐指数
1
解决办法
5万
查看次数

如何忽略错误并继续无限流?

我想知道如何忽略异常并继续无限流(在我的情况下,位置流)?

我正在获取当前用户位置(使用Android-ReactiveLocation),然后将它们发送到我的API(使用Retrofit).

在我的情况下,当在网络调用(例如超时)期间发生异常时,调用onError方法并且流自行停止.怎么避免呢?

活动:

private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);
...
private void start() {
    mRestService = ...;
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
            .buffer(50)
            .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}
Run Code Online (Sandbox Code Playgroud)

RestService:

public interface RestService {
    @POST("/.../")
    Observable<Response> postLocations(@Body List<Location> locations);
}
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-android

39
推荐指数
4
解决办法
3万
查看次数

rxjava合并不同类型的observable

我是rxjava的新手.我需要组合两个发出不同类型对象的observable.喜欢的东西Observable<Milk>,并Observable<Cereals>和获得Observable<CerealsWithMilk>.对于像这样的事情,我找不到任何操作员.做这样的事情的rx方式是什么?请注意,Milk并且Cereals是异步的.

asynchronous observable rx-java rx-android

39
推荐指数
2
解决办法
2万
查看次数

RxJava:尝试将错误传播到Observer.onError时发生错误

我在Rx库中收到IllegalStateException错误,并且不知道问题根源的确切位置,无论是RxJava还是我可能做错了.

证书锁定(发生在所有服务器请求上)但似乎指向会话超时或注销并重新登录时发生致命崩溃.Repro步骤(大约25%的时间发生)如下:登录,打开列表项 - 滚动一路结束 - 注销 - 重新登录 - 打开应用程序 - 关闭应用程序 - >崩溃!

任何人对如何防止这种情况有任何想法?我发现Observer.onError类似的问题在不一致的情况下触发

java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
   at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
   at android.os.Handler.handleCallback(Handler.java:615)
   at android.os.Handler.dispatchMessage(Handler.java:92)
   at android.os.Looper.loop(Looper.java:137)
   at android.app.ActivityThread.main(ActivityThread.java:4867)
   at java.lang.reflect.Method.invokeNative(Method.java)
   at java.lang.reflect.Method.invoke(Method.java:511)
   at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1007)
   at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:774)
   at dalvik.system.NativeStart.main(NativeStart.java)
Caused by: rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError
   at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:201)
   at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
   at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:159)
   at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
   at android.os.Handler.handleCallback(Handler.java:615)
   at android.os.Handler.dispatchMessage(Handler.java:92)
   at android.os.Looper.loop(Looper.java:137)
   at android.app.ActivityThread.main(ActivityThread.java:4867)
   at java.lang.reflect.Method.invokeNative(Method.java)
   at java.lang.reflect.Method.invoke(Method.java:511)
   at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1007)
   at …
Run Code Online (Sandbox Code Playgroud)

android exception onerror rx-java rx-android

37
推荐指数
2
解决办法
2万
查看次数