标签: rx-java

任务取消如何在RxJava中工作?

我不清楚如何在RXJava中实现任务取消.

我对移植使用Guava构建的现有API很感兴趣ListenableFuture.我的用例如下:

  • 我有一个单一的操作,由一系列期货加入 Futures.transform()
  • 多个订户观察操作的最终未来.
  • 每个观察者都可以取消最后的未来,所有观察者都会看到取消事件.
  • 取消最终的未来导致其依赖性的取消,例如按顺序1- > 2- > 3,取消3传播到2,等等.

关于这一点,RxJava维基中的信息很少; 我可以找到的唯一引用取消提及Subscription相当于.NET的Disposable,但据我所知,Subscription仅提供取消订阅序列中后续值的功能.

我不清楚如何通过这个API实现"任何订阅者可以取消"语义.我是以错误的方式思考这个问题吗?

任何输入将不胜感激.

java rx-java

14
推荐指数
1
解决办法
1万
查看次数

为什么订阅不在新线程中执行?

我有这样的任务:

Observable.just(getMessagesFromDb()).
    subscribeOn(Schedulers.newThread()).
    observeOn(AndroidSchedulers.mainThread()).
    subscribe(incomingMessages -> {
    //do something
    });
Run Code Online (Sandbox Code Playgroud)

getMessagesFromDb方法在哪里同步获取消息,而不在内部进行多线程处理.根据subscribeOn方法的RxAndroid文档:

在指定的Scheduler上异步订阅Observvers到此Observable

还有我的问题 - 为什么数据库请求在主线程上执行?如何异步进行?

rx-java rx-android

14
推荐指数
1
解决办法
3717
查看次数

在Android中下载RxJava,OkHttp和Okio的进度

在我们的应用程序中,我使用此代码下载图像文件.我需要在UI上显示下载进度(以百分比形式下载的字节数).我如何在此代码中获得下载进度?我搜索了解决方案,但仍无法自行完成.

Observable<String> downloadObservable = Observable.create(
                    sub -> {
                        Request request = new Request.Builder()
                                .url(media.getMediaUrl())
                                .build();
                        Response response = null;
                        try {
                            response = http_client.newCall(request).execute();
                            if (response.isSuccessful()) {
                                Log.d(TAG, "response.isSuccessful()");
                                String mimeType = MimeTypeMap.getFileExtensionFromUrl(media.getMediaUrl());
                                File file = new File(helper.getTmpFolder() + "/" + helper.generateUniqueName() + "test." + mimeType);
                                BufferedSink sink = Okio.buffer(Okio.sink(file));
                                sink.writeAll(response.body().source());
                                sink.close();
                                sub.onNext(response.toString());
                                sub.onCompleted();
                            } else {
                                sub.onError(new IOException());
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }
            );

            Subscriber<String> mySubscriber = new Subscriber<String>() {
                @Override
                public void …
Run Code Online (Sandbox Code Playgroud)

android rx-java okhttp okio

14
推荐指数
1
解决办法
1万
查看次数

使用Web应用程序中的RxJava Observables无法解决性能提升的不可解决性

我正在执行一些测试来评估使用基于Observables的反应API是否真的有优势,而不是阻止传统的API.

整个例子可以在Githug上找到

令人惊讶的是,结果表明,输出结果是:

  • 最好的:返回包含阻塞操作的Callable/的REST服务DeferredResult.

  • 还不错:阻止REST服务.

  • 最糟糕的:返回DeferredResult的REST服务,其结果由RxJava Observable设置.

这是我的Spring WebApp:

申请:

@SpringBootApplication
public class SpringNioRestApplication {

   @Bean
    public ThreadPoolTaskExecutor executor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        return executor;
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringNioRestApplication.class, args);
    }
}
Run Code Online (Sandbox Code Playgroud)

SyncController:

@RestController("SyncRestController")
@Api(value="", description="Synchronous data controller")
public class SyncRestController {

    @Autowired
    private DataService dataService;

    @RequestMapping(value="/sync/data", method=RequestMethod.GET, produces="application/json")
    @ApiOperation(value = "Gets data", notes="Gets data synchronously")
    @ApiResponses(value={@ApiResponse(code=200, message="OK")})
    public List<Data> getData(){ …
Run Code Online (Sandbox Code Playgroud)

java netflix reactive-programming rx-java

14
推荐指数
1
解决办法
3094
查看次数

是否有可能在rxjava android中的订阅者的onNext()中获得2个值?

我有这样的观察

Observable.zip(observable, extObs, new Func2<List<UserProfile>, ArrayList<Extension>, UserProfile>() {
        @Override
        public UserProfile call(List<UserProfile> userProfiles, ArrayList<Extension> extensions) {


            return userProfiles.get(0);
        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).unsubscribeOn(Schedulers.io()).subscribe(new Subscriber<UserProfile>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onNext(UserProfile userProfile) {
            profileListener.onProfileSet(userProfile);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

我需要通过ArrayList中的方法profileListener.onProfileSet(userProfile);profileListener.onProfileSet(userProfile,extensions);

是否可以在zip中执行此操作,还是有任何其他rxjava方法来解决此类问题?

android rx-java

14
推荐指数
1
解决办法
7190
查看次数

RxJavaCallAdapterFactory无法转换为Factory

我正在尝试按照https://inthecheesefactory.com/blog/retrofit-2.0/en中的指南使用Retrofit 2和RxJava

在"与CallAdapter的RxJava集成"一节中解释了如何使用RxJava进行改造

Retrofit retrofit = new Retrofit.Builder()
    .baseUrl("http://api.nuuneoi.com/base/")
    .addConverterFactory(GsonConverterFactory.create())
    .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
    .build();
Run Code Online (Sandbox Code Playgroud)

但是在编译期间存在以下错误:

Error:(63, 71) error: incompatible types: RxJavaCallAdapterFactory cannot be converted to Factory
Run Code Online (Sandbox Code Playgroud)

我该如何解决?谢谢

rest android rx-java retrofit retrofit2

14
推荐指数
1
解决办法
8207
查看次数

像RxJava中的Subject一样排队

我正在寻找一个可以的主题(或类似的东西):

  1. 如果没有订阅者,可以接收项目并将其保留在队列或缓冲区中
  2. 一旦我们有订户,所有物品都被消耗掉并且再也不会被发射
  3. 我可以订阅/取消订阅主题

BehaviorSubject 几乎可以完成这项工作,但它保留了最后观察到的项目.

UPDATE

基于已接受的答案,我为单个观察项目制定了类似的解决方案.还添加了取消订阅部分以避免内存泄漏.

class LastEventObservable private constructor(
        private val onSubscribe: OnSubscribe<Any>,
        private val state: State
) : Observable<Any>(onSubscribe) {

    fun emit(value: Any) {
        if (state.subscriber.hasObservers()) {
            state.subscriber.onNext(value)
        } else {
            state.lastItem = value
        }
    }

    companion object {
        fun create(): LastEventObservable {
            val state = State()

            val onSubscribe = OnSubscribe<Any> { subscriber ->
                just(state.lastItem)
                        .filter { it != null }
                        .doOnNext { subscriber.onNext(it) }
                        .doOnCompleted { state.lastItem = null }
                        .subscribe()

                val subscription = state.subscriber.subscribe(subscriber) …
Run Code Online (Sandbox Code Playgroud)

rx-java

14
推荐指数
1
解决办法
5407
查看次数

RxJava,一个可观察的多个订阅者:publish().autoConnect()

我正在玩rxJava/rxAndroid,并且有一些非常基本的东西不像我期望的那样.我有一个可观察的和两个订阅者:

Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));

Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
Run Code Online (Sandbox Code Playgroud)

这是输出:

D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
Run Code Online (Sandbox Code Playgroud)

现在,我知道我可以通过使用避免重复计数,publish().autoConnect()但我试图首先理解这种默认行为.每当有人订阅observable时,它就会开始发出数字序列.我明白了.因此,当Subscriber 1连接时它开始发射物品.Subscriber 2马上连接,为什么不能获得价值呢?

这是我理解它的方式,从可观察的角度来看:

  1. 有人订阅了我,我应该开始发出物品
    [订阅者:1] [项目到EMIT:1,2,3]

  2. 向订户发出项目"1"
    [订阅者:1] [项目到EMIT:2,3]

  3. 有人订阅了我,当我完成后我将再次发出1,2,3
    [订阅者:1&2] [项目到EMIT:2,3,1,2,3]

  4. 向订户发出项目'2'
    [订阅者:1&2] [项目到EMIT:3,1,2,3]

  5. 向订户发出项目'3'
    [订阅者:1&2] …

reactive-programming rx-java

14
推荐指数
2
解决办法
1万
查看次数

在RxJava中链接单打

我在我的Android应用程序中使用RxJava和Retrofit向服务器发出网络请求.我正在使用RxJavaCallAdapterFactory,所以我可以让我的改装请求返回单打.在我的代码中,改造对象被命名为'api'.

这里的代码工作正常,但在这个例子中,我需要在制作播放列表之前检索userId.我将userId请求平面映射到API请求,在制作播放列表后,我需要再次使用平面地图将JSON响应转换为可用对象.

public JSONUser me;

public Single<String> getUserId(){

    if(me != null){
        return Single.just(me.getUserId());
    }

    return api.getMe().flatMap(new Func1<JSONUser, Single<String>>() {
        @Override
        public Single<String> call(JSONUser meResult) {
            me = meResult;
            return Single.just(me.getUserId());
        }
    });
}

public Single<Playlist> createPlaylist(String name) {

    final NewPlaylistConfig config = new NewPlaylistConfig(name);

    return getUserId().flatMap(new Func1<String, Single<Playlist>>() {
        @Override
        public Single<Playlist> call(String userId) {
            return api.createPlaylist(userId, config).flatMap(
                    new Func1<JSONPlaylist, Single<? extends SpotifyPlaylist>>() {
                        @Override
                        public Single<? extends Playlist> call(JSONPlaylist data) {
                            return Single.just(new Playlist(data));
                    }
            });
        }
    });
} …
Run Code Online (Sandbox Code Playgroud)

android promise chain rx-java

14
推荐指数
1
解决办法
5026
查看次数

如何在RxJava 2和Kotlin中将null传递给具有可空类型的Observable

我像这样初始化我的变量: -

 val user: BehaviorSubject<User?> user = BehaviorSubject.create()
Run Code Online (Sandbox Code Playgroud)

但我不能这样做.IDE抛出错误: -

user.onNext(null)
Run Code Online (Sandbox Code Playgroud)

这样做,IDE说你永远不会为空: -

user.filter( u -> u!=null)
Run Code Online (Sandbox Code Playgroud)

android kotlin rx-java rx-kotlin rx-java2

14
推荐指数
3
解决办法
1万
查看次数