我\xc2\xb4m 寻找运算符 SkipUntil,但似乎没有按我的预期工作。\n这是我的代码
\n\n@Test\npublic void testSkiUitil() throws InterruptedException {\n List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);\n Observable observable2 = Observable.just(1);\n Subscription subscription = Observable.from(numbers)\n .skipUntil(observable2)\n .subscribe(System.out::println);\n Thread.sleep(3000);\n observable2.subscribe();\n new TestSubscriber((Observer) subscription).awaitTerminalEvent(5, TimeUnit.SECONDS);\n\n}\nRun Code Online (Sandbox Code Playgroud)\n\n我试图证明,由于 observable2 没有任何订阅,因此不会发出任何项目,因此使用操作符skipUntil 的第一个 observable 应该跳过所有项目。但仍然发出所有 5 项。
\n\n知道为什么吗?
\n\n医生说。
\n\n Returns an Observable that skips items emitted by the source Observable until a second Observable emits\nRun Code Online (Sandbox Code Playgroud)\n 我的 Android 应用程序中有以下代码,试图防止多次单击按钮:
RxView.clicks(bSubmit)
.debounce(2500, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(c -> displayToast());
Run Code Online (Sandbox Code Playgroud)
但此代码的作用不是执行代码,然后防止在同一时间跨度内执行多次点击,而是在去抖时间跨度过去后执行命令。
我怎样才能实现我想要的?
rxjava2 版本 2.1.5
试图理解 RxJava2 对一个 observable 的多个订阅。有一个简单的文件监视服务,用于跟踪目录中文件的创建、修改、删除。我添加了 2 个订阅者,并希望在两个订阅者上都打印事件。当我将文件复制到监视目录中时,我看到一个订阅者打印出事件。然后,当我删除文件时,我看到第二个订阅者打印出事件。我期待两个订阅者都打印事件。我在这里缺少什么?
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.TimeUnit;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class MyRxJava2DirWatcher {
public Flowable<WatchEvent<?>> createFlowable(WatchService watcher, Path path) {
return Flowable.create(subscriber -> {
boolean error = false;
WatchKey key;
try {
key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
}
catch (IOException e) {
subscriber.onError(e);
error = true;
} …Run Code Online (Sandbox Code Playgroud) 以下代码打印 1、2
Observable.just(1)
.doOnSubscribe(d -> System.out.println(1))
.doOnSubscribe(d -> System.out.println(2))
.blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)
这个打印 2, 1
Observable.just(1)
.doOnSubscribe(d -> System.out.println(1))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(d -> System.out.println(2))
.blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)
在 RxJava1 中,这两个代码都打印“2, 1”,因为doOnSubscribe在下游订阅上游之前调用。
在 RxJava2 中,订阅是从上游到下游 ( Observer.onSubscribe),但doOnSubscribe仍会在订阅之前调用。于是混乱的秩序出现了。
即使我可以给出一个更混乱的情况:
Observable.just(1)
.doOnSubscribe(d -> System.out.println(1))
.doOnSubscribe(d -> System.out.println(2))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(d -> System.out.println(3))
.doOnSubscribe(d -> System.out.println(4))
.blockingSubscribe();
Run Code Online (Sandbox Code Playgroud)
它按照我的预期打印“3, 4, 1, 2”,但不是最预期的。
这是设计使然吗?如果是,有什么好处?
我正在尝试测试订阅冷 observable 的部分代码。我的问题是单元测试在断言结果之前没有等待内部可观察调用结束,因此我的测试失败。
这是我要测试的代码:
public class UserManager {
private UserRepository userRepository;
private PublishSubject<List<User>> usersSubject = PublishSubject.create();
public UserManager(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Observable<List<User>> users() {
return usersSubject;
}
public void onUserIdsReceived(List<String> userIds) {
userRepository.getUsersInfo(userIds)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<List<User>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {}
@Override
public void onSuccess(@NonNull List<User> users) {
usersSubject.onNext(users);
}
@Override
public void onError(Throwable e) {}
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的测试:
@RunWith(MockitoJUnitRunner::class)
class UserManagerTest {
private val userRepository = mock(UserRepository::class.java)
companion object {
@BeforeClass …Run Code Online (Sandbox Code Playgroud) 我想用 来模拟和测试我Presenter的Observable,但我不知道该怎么做,代码的主要部分如下:
//in my presenter:
override fun loadData(){
this.disposable?.dispose()
this.disposable =
Observable.create<List<Note>> {emitter->
this.notesRepository.getNotes {notes->
emitter.onNext(notes)
}
}
.doOnSubscribe {
this.view.showProgress()
}
.subscribe {
this.view.hideProgress()
this.view.displayNotes(it)
}
}
//in test:
@Test
fun load_notes_from_repository_and_display(){
val loadCallback = slot<(List<Note>)->Unit>();
every {
notesRepository.getNotes(capture(loadCallback))
} answers {
//Observable.just(FAKE_DATA)
loadCallback.invoke(FAKE_DATA)
}
notesListPresenter.loadData()
verifySequence {
notesListView.showProgress()
notesListView.hideProgress()
notesListView.displayNotes(FAKE_DATA)
}
}
Run Code Online (Sandbox Code Playgroud)
我得到了错误:
Verification failed: call 2 of 3: IView(#2).hideProgress()) was not called.
那么,如何在 Android 单元测试中使用 Mockk 测试 Rx 的东西呢?提前致谢!
对不起我的英语不好。我有 id 类别列表,我确实提出了这样的要求:
for(Product cat:all_cats) {
new DefaultMediator()
.getProductsInCatalog(cat.getID())
.timeout(15, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(list_products -> {
//do something
},
throwable -> {
Log.e("asd", "asd");
});
}
Run Code Online (Sandbox Code Playgroud)
我觉得不好。我如何使用 rxJava 发送请求列表?
我在 RxJava 中有一个函数,它试图根据条件找到一个事物,如果成功,它会转换并将其作为Observable. Observable.empty()如果找不到任何东西,我想返回,因此没有第一个。我不确定这一点,但我认为如果 filter 过滤掉 中的每个元素,Observable结果将是Observable.empty()无论如何(没有firstElement())。
void Observable<Thing> transformFirst(Observable<Thing> things,Predicate<Thing> condition){
return things
.filter(condition)
.firstElement()
.map(firstThing ->{...do sg...})
}
Run Code Online (Sandbox Code Playgroud)
编辑:
我的问题是firstElement()回报Maybe<Thing>,我不知道如何把它转换成一个Observable.empty()当filter(condition)过滤掉一切(conditionevaulates为false,每thing)
我想将列表中的每个对象都转换为另一个对象。但是这样做我的代码卡在将它们转换回 List
override fun myFunction(): LiveData<MutableList<MyModel>> {
return mySdk
.getAllElements() // Returns Flowable<List<CustomObject>>
.flatMap { Flowable.fromIterable(it) }
.map {MyModel(it.name!!, it.phoneNumber!!) }
.toList() //Debugger does not enter here
.toFlowable()
.onErrorReturn { Collections.emptyList() }
.subscribeOn(Schedulers.io())
.to { LiveDataReactiveStreams.fromPublisher(it) }
}
Run Code Online (Sandbox Code Playgroud)
一切都很好,直到映射。但调试器甚至不会停留在 toList 或 toList 之下的任何其他地方。我该如何解决这个问题?
我一直面临着主题和 TestScheduler 的问题。如果我使用 Trampoline 调度程序,我的测试会通过,但是如果我使用 TestScheduler,它们会由于某种原因失败。
这是我的示例测试和相关课程。
@RunWith(MockitoJUnitRunner::class)
class DemoViewModelTest {
//Error Mocks
private val actionsStream: PublishSubject<DemoContract.ViewEvent> = PublishSubject.create()
private lateinit var viewModel: DemoViewModel
private val handler = mock(DemoContract.Handler::class.java)
@Before
fun setup() {
viewModel = DemoViewModel(schedulersProvider, handler)
viewModel.viewEventsStream = actionsStream
}
@Test
fun testUpdateCounter() {
actionsStream.onNext(DemoContract.ViewEvent.UpdateClick)
testScheduler.triggerActions()
verify(handler).onUpdate()
}
protected var testScheduler = TestScheduler()
protected var schedulersProvider: SchedulersProvider = object : SchedulersProvider() {
override fun mainThread(): Scheduler {
return testScheduler
}
override fun io(): Scheduler {
return testScheduler
}
override …Run Code Online (Sandbox Code Playgroud) rx-java ×10
android ×6
rx-java2 ×5
unit-testing ×3
reactivex ×2
rx-android ×2
kotlin ×1
mockk ×1
observable ×1
request ×1
rx-binding ×1
testing ×1
watchservice ×1