标签: rx-java

SkipUntil 无法按预期工作

我\xc2\xb4m 寻找运算符 SkipUntil,但似乎没有按我的预期工作。\n这是我的代码

\n\n
@Test\npublic void testSkiUitil() throws InterruptedException {\n    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);\n    Observable observable2 = Observable.just(1);\n    Subscription subscription = Observable.from(numbers)\n                                          .skipUntil(observable2)\n                                          .subscribe(System.out::println);\n    Thread.sleep(3000);\n    observable2.subscribe();\n    new TestSubscriber((Observer) subscription).awaitTerminalEvent(5, TimeUnit.SECONDS);\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

我试图证明,由于 observable2 没有任何订阅,因此不会发出任何项目,因此使用操作符skipUntil 的第一个 observable 应该跳过所有项目。但仍然发出所有 5 项。

\n\n

知道为什么吗?

\n\n

医生说。

\n\n
   Returns an Observable that skips items emitted by the source Observable until a second Observable emits\n
Run Code Online (Sandbox Code Playgroud)\n

reactive-programming observable rx-java reactivex

2
推荐指数
1
解决办法
1164
查看次数

RxView.debounce等待debounce的时间才执行命令,如何立即执行命令?

我的 Android 应用程序中有以下代码,试图防止多次单击按钮:

RxView.clicks(bSubmit)
            .debounce(2500, TimeUnit.MILLISECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(c -> displayToast());
Run Code Online (Sandbox Code Playgroud)

但此代码的作用不是执行代码,然后防止在同一时间跨度内执行多次点击,而是在去抖时间跨度过去后执行命令。

我怎样才能实现我想要的?

android rx-java rx-binding

2
推荐指数
1
解决办法
954
查看次数

RxJava2,2 个 Observable/Flowable 订阅者,但 onNext 被任何一个调用

rxjava2 版本 2.1.5

试图理解 RxJava2 对一个 observable 的多个订阅。有一个简单的文件监视服务,用于跟踪目录中文件的创建、修改、删除。我添加了 2 个订阅者,并希望在两个订阅者上都打印事件。当我将文件复制到监视目录中时,我看到一个订阅者打印出事件。然后,当我删除文件时,我看到第二个订阅者打印出事件。我期待两个订阅者都打印事件。我在这里缺少什么?

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;

import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

public class MyRxJava2DirWatcher {

    public Flowable<WatchEvent<?>> createFlowable(WatchService watcher, Path path) {

        return Flowable.create(subscriber -> {

            boolean error = false;
            WatchKey key;
            try {

                key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
            }
            catch (IOException e) {
                subscriber.onError(e);
                error = true;
            } …
Run Code Online (Sandbox Code Playgroud)

watchservice rx-java rx-java2

2
推荐指数
1
解决办法
1672
查看次数

为什么 RxJava2 doOnSubscribe 以混乱的顺序运行?

以下代码打印 1、2

Observable.just(1)
    .doOnSubscribe(d -> System.out.println(1))
    .doOnSubscribe(d -> System.out.println(2))
    .blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)

这个打印 2, 1

Observable.just(1)
    .doOnSubscribe(d -> System.out.println(1))
    .subscribeOn(Schedulers.newThread())
    .doOnSubscribe(d -> System.out.println(2))
    .blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)

在 RxJava1 中,这两个代码都打印“2, 1”,因为doOnSubscribe在下游订阅上游之前调用。

在 RxJava2 中,订阅是从上游到下游 ( Observer.onSubscribe),但doOnSubscribe仍会在订阅之前调用。于是混乱的秩序出现了。

即使我可以给出一个更混乱的情况:

Observable.just(1)
    .doOnSubscribe(d -> System.out.println(1))
    .doOnSubscribe(d -> System.out.println(2))
    .subscribeOn(Schedulers.newThread())
    .doOnSubscribe(d -> System.out.println(3))
    .doOnSubscribe(d -> System.out.println(4))
    .blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)

它按照我的预期打印“3, 4, 1, 2”,但不是最预期的。

这是设计使然吗?如果是,有什么好处?

rx-java rx-java2

2
推荐指数
1
解决办法
2961
查看次数

在单元测试中,如何在断言结果之前等待 RxJava2 Observable 完成

我正在尝试测试订阅冷 observable 的部分代码。我的问题是单元测试在断言结果之前没有等待内部可观察调用结束,因此我的测试失败。

这是我要测试的代码:

public class UserManager {

    private UserRepository userRepository;
    private PublishSubject<List<User>> usersSubject = PublishSubject.create();

    public UserManager(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public Observable<List<User>> users() {
        return usersSubject;
    }

    public void onUserIdsReceived(List<String> userIds) {
        userRepository.getUsersInfo(userIds)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new SingleObserver<List<User>>() {
                @Override
                    public void onSubscribe(@NonNull Disposable d) {}

                    @Override
                    public void onSuccess(@NonNull List<User> users) {
                        usersSubject.onNext(users);
                    }

                    @Override
                    public void onError(Throwable e) {}
                }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我的测试:

@RunWith(MockitoJUnitRunner::class)
class UserManagerTest {

private val userRepository = mock(UserRepository::class.java)

companion object {
    @BeforeClass …
Run Code Online (Sandbox Code Playgroud)

android unit-testing rx-java rx-java2

2
推荐指数
1
解决办法
2453
查看次数

如何使用 Mockk 模拟和测试 RxJava/RxAndroid?

我想用 来模拟和测试我PresenterObservable,但我不知道该怎么做,代码的主要部分如下:

//in my presenter:
override fun loadData(){
    this.disposable?.dispose()
    this.disposable = 
        Observable.create<List<Note>> {emitter->
            this.notesRepository.getNotes {notes->
                emitter.onNext(notes)
            }
        }
            .doOnSubscribe {
                this.view.showProgress()
            }
            .subscribe {
                this.view.hideProgress()
                this.view.displayNotes(it)
            }
}

//in test:
@Test
fun load_notes_from_repository_and_display(){
    val loadCallback = slot<(List<Note>)->Unit>();
    every {
        notesRepository.getNotes(capture(loadCallback))
    } answers {
        //Observable.just(FAKE_DATA)
        loadCallback.invoke(FAKE_DATA)
    }
    notesListPresenter.loadData()
    verifySequence {
        notesListView.showProgress()
        notesListView.hideProgress()
        notesListView.displayNotes(FAKE_DATA)
    }
}
Run Code Online (Sandbox Code Playgroud)

我得到了错误: Verification failed: call 2 of 3: IView(#2).hideProgress()) was not called.

那么,如何在 Android 单元测试中使用 Mockk 测试 Rx 的东西呢?提前致谢!

android unit-testing rx-java rx-android mockk

2
推荐指数
1
解决办法
1385
查看次数

RxJava 请求列表

对不起我的英语不好。我有 id 类别列表,我确实提出了这样的要求:

for(Product cat:all_cats) {
            new DefaultMediator()
                    .getProductsInCatalog(cat.getID())
                    .timeout(15, TimeUnit.SECONDS)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(list_products -> {
                                //do something
                            },
                            throwable -> {
                                Log.e("asd", "asd");
                            });
        }
Run Code Online (Sandbox Code Playgroud)

我觉得不好。我如何使用 rxJava 发送请求列表?

android request rx-java

2
推荐指数
1
解决办法
535
查看次数

如果 observable.filter().first() 什么都不返回,如何返回 Observable.empty()?

我在 RxJava 中有一个函数,它试图根据条件找到一个事物,如果成功,它会转换并将其作为Observable. Observable.empty()如果找不到任何东西,我想返回,因此没有第一个。我不确定这一点,但我认为如果 filter 过滤掉 中的每个元素,Observable结果将是Observable.empty()无论如何(没有firstElement())。

void Observable<Thing> transformFirst(Observable<Thing> things,Predicate<Thing> condition){
  return things
    .filter(condition)
    .firstElement()
    .map(firstThing ->{...do sg...})
}
Run Code Online (Sandbox Code Playgroud)

编辑:

我的问题是firstElement()回报Maybe<Thing>,我不知道如何把它转换成一个Observable.empty()filter(condition)过滤掉一切(conditionevaulates为false,每thing

rx-java reactivex rx-java2

2
推荐指数
1
解决办法
2471
查看次数

RxJava - 将列表的结果映射到另一个列表

我想将列表中的每个对象都转换为另一个对象。但是这样做我的代码卡在将它们转换回 List

override fun myFunction(): LiveData<MutableList<MyModel>> {
    return mySdk
            .getAllElements() // Returns Flowable<List<CustomObject>>
            .flatMap { Flowable.fromIterable(it) }
            .map {MyModel(it.name!!, it.phoneNumber!!) }
            .toList() //Debugger does not enter here
            .toFlowable()
            .onErrorReturn { Collections.emptyList() }
            .subscribeOn(Schedulers.io())
            .to { LiveDataReactiveStreams.fromPublisher(it) }
}
Run Code Online (Sandbox Code Playgroud)

一切都很好,直到映射。但调试器甚至不会停留在 toList 或 toList 之下的任何其他地方。我该如何解决这个问题?

android kotlin rx-java rx-java2

2
推荐指数
1
解决办法
2851
查看次数

PublishSubject 和 TestScheduler 的问题,未发出项目

我一直面临着主题和 TestScheduler 的问题。如果我使用 Trampoline 调度程序,我的测试会通过,但是如果我使用 TestScheduler,它们会由于某种原因失败。

这是我的示例测试和相关课程。

@RunWith(MockitoJUnitRunner::class)
class DemoViewModelTest  {


    //Error Mocks
    private val actionsStream: PublishSubject<DemoContract.ViewEvent> = PublishSubject.create()

    private lateinit var viewModel: DemoViewModel

    private val handler = mock(DemoContract.Handler::class.java)

    @Before
    fun setup() {
        viewModel = DemoViewModel(schedulersProvider, handler)
        viewModel.viewEventsStream = actionsStream
    }

    @Test
    fun testUpdateCounter() {
        actionsStream.onNext(DemoContract.ViewEvent.UpdateClick)
        testScheduler.triggerActions()
        verify(handler).onUpdate()

    }


    protected var testScheduler = TestScheduler()

    protected var schedulersProvider: SchedulersProvider = object : SchedulersProvider() {
        override fun mainThread(): Scheduler {
            return testScheduler
        }

        override fun io(): Scheduler {
            return testScheduler
        }

        override …
Run Code Online (Sandbox Code Playgroud)

testing android unit-testing rx-java rx-android

2
推荐指数
1
解决办法
1108
查看次数