标签: rx-java2

无法在 Kotlin Android 中的 Observable 上调用 from() 运算符

我正在尝试使用 kotlin 学习适用于 Android 的 RxJava2,并且正在遵循这个很好的在线教程。首先,我在 gradle.build 文件中添加了这两行:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.3'
Run Code Online (Sandbox Code Playgroud)

我尝试用以下代码实现 Observable 模式:

import io.reactivex.Observable

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        val observable = Observable.from(arrayOf(1, 2, 3, 4, 5, 6))

    }
}
Run Code Online (Sandbox Code Playgroud)

这应该很容易工作,但我无法在 Observable 上调用 from() 运算符(未解析的引用:from)。所以基本上我在开始之前就陷入了困境。有人知道我做错了什么吗?

android observable kotlin rx-java2

2
推荐指数
1
解决办法
775
查看次数

在单元测试中,如何在断言结果之前等待 RxJava2 Observable 完成

我正在尝试测试订阅冷 observable 的部分代码。我的问题是单元测试在断言结果之前没有等待内部可观察调用结束,因此我的测试失败。

这是我要测试的代码:

public class UserManager {

    private UserRepository userRepository;
    private PublishSubject<List<User>> usersSubject = PublishSubject.create();

    public UserManager(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public Observable<List<User>> users() {
        return usersSubject;
    }

    public void onUserIdsReceived(List<String> userIds) {
        userRepository.getUsersInfo(userIds)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new SingleObserver<List<User>>() {
                @Override
                    public void onSubscribe(@NonNull Disposable d) {}

                    @Override
                    public void onSuccess(@NonNull List<User> users) {
                        usersSubject.onNext(users);
                    }

                    @Override
                    public void onError(Throwable e) {}
                }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我的测试:

@RunWith(MockitoJUnitRunner::class)
class UserManagerTest {

private val userRepository = mock(UserRepository::class.java)

companion object {
    @BeforeClass …
Run Code Online (Sandbox Code Playgroud)

android unit-testing rx-java rx-java2

2
推荐指数
1
解决办法
2453
查看次数

房间可流动多次调用

我的 TeamDao 中有这个

@Query("select * from teams where id = :teamId")
    Flowable<Team> getRivals(int teamId);
Run Code Online (Sandbox Code Playgroud)

我像这样订阅这个

teamRepository.getRivals(61).
                distinctUntilChanged().
                observeOn(SchedulerProvider.getInstance().ui()).

                subscribeOn(SchedulerProvider.getInstance().computation())
                .subscribe(team -> Log.d("HomeActivity", team.getName()));
Run Code Online (Sandbox Code Playgroud)

每当 Team 表的任何行发生任何更改时,无论它是否带有 id 61 ,我都会看到我的 this 订阅者正在调用。

我在博客上读到 distinctUntilChanged() 正是用来避免这种情况的。

我的团队POJO是这样的

@Entity(tableName = "teams",
        foreignKeys = {
                @ForeignKey
                        (entity = User.class,
                                parentColumns = "id",
                                childColumns = "userId",
                                onDelete = ForeignKey.CASCADE),
                @ForeignKey
                        (entity = Neighbourhood.class,
                                parentColumns = "id",
                                childColumns = "neighbourhoodId"
                        )})
public class Team {

    @PrimaryKey
    int id;
    private String name;
    private String imageUrl;

    @Embedded(prefix = …
Run Code Online (Sandbox Code Playgroud)

android rx-android rx-java2 android-room

2
推荐指数
1
解决办法
990
查看次数

使用 ViewModel、LiveData 和 RxJava 在 recyclerview 中处理数据和加载指示器的正确方法

从数据源搜索项目时,我有以下 UI 流程:

  1. 从源检索时显示进度指示器 -> 将实时数据分配给 Outcome.loading(true)
  2. 显示结果 -> 分配 LiveData Outcome.success(results)
  3. 隐藏进度指示器 -> 分配 LiveData Outcome.loading(false)

现在的问题是当应用程序在后台时调用 #2 和 #3。恢复应用程序,LiveData 观察者只会收到 #3 而不是 #2 导致未填充 RecyclerView 的通知。

处理这种情况的正确方法是什么?

class SearchViewModel @Inject constructor(
    private val dataSource: MusicInfoRepositoryInterface, 
    private val scheduler: Scheduler, 
    private val disposables: CompositeDisposable) : ViewModel() {

    private val searchOutcome = MutableLiveData<Outcome<List<MusicInfo>>>()
    val searchOutcomLiveData: LiveData<Outcome<List<MusicInfo>>>
        get() = searchOutcome

    fun search(searchText: String) {
        Timber.d(".loadMusicInfos")
        if(searchText.isBlank()) {
            return
        }

        dataSource.search(searchText)
                .observeOn(scheduler.mainThread())
                .startWith(Outcome.loading(true))
                .onErrorReturn { throwable -> Outcome.failure(throwable) }
                .doOnTerminate { searchOutcome.value …
Run Code Online (Sandbox Code Playgroud)

android kotlin android-mvvm rx-java2 android-livedata

2
推荐指数
1
解决办法
3837
查看次数

如果 observable.filter().first() 什么都不返回,如何返回 Observable.empty()?

我在 RxJava 中有一个函数,它试图根据条件找到一个事物,如果成功,它会转换并将其作为Observable. Observable.empty()如果找不到任何东西,我想返回,因此没有第一个。我不确定这一点,但我认为如果 filter 过滤掉 中的每个元素,Observable结果将是Observable.empty()无论如何(没有firstElement())。

void Observable<Thing> transformFirst(Observable<Thing> things,Predicate<Thing> condition){
  return things
    .filter(condition)
    .firstElement()
    .map(firstThing ->{...do sg...})
}
Run Code Online (Sandbox Code Playgroud)

编辑:

我的问题是firstElement()回报Maybe<Thing>,我不知道如何把它转换成一个Observable.empty()filter(condition)过滤掉一切(conditionevaulates为false,每thing

rx-java reactivex rx-java2

2
推荐指数
1
解决办法
2471
查看次数

RxJava - 将列表的结果映射到另一个列表

我想将列表中的每个对象都转换为另一个对象。但是这样做我的代码卡在将它们转换回 List

override fun myFunction(): LiveData<MutableList<MyModel>> {
    return mySdk
            .getAllElements() // Returns Flowable<List<CustomObject>>
            .flatMap { Flowable.fromIterable(it) }
            .map {MyModel(it.name!!, it.phoneNumber!!) }
            .toList() //Debugger does not enter here
            .toFlowable()
            .onErrorReturn { Collections.emptyList() }
            .subscribeOn(Schedulers.io())
            .to { LiveDataReactiveStreams.fromPublisher(it) }
}
Run Code Online (Sandbox Code Playgroud)

一切都很好,直到映射。但调试器甚至不会停留在 toList 或 toList 之下的任何其他地方。我该如何解决这个问题?

android kotlin rx-java rx-java2

2
推荐指数
1
解决办法
2851
查看次数

如何使用 micronaut 流式传输来自 JPA 的数据流?

我目前开始使用 micronaut 和 kotlin。我有 JPA 查询,结果大约有 100 万个。这些结果我想从这个 micronaut 服务流到另一个。

我的查询返回 aallQuery.resultStream类型java.util.stream

发送服务的控制器:

@Get("/test{value1,value2,value3}")
fun getTestObjects(
    value1: String,
    value2: String,
    value3: String
): Stream<TestObject> {
    val entries = testRepository.findAllWhere(value1, value2, value3)

    return entries
}
Run Code Online (Sandbox Code Playgroud)

接收服务的客户:

@Get("/data/test{value1,value2,value3}")
override fun getTestObjects(alue1: String,
    value2: String,
    value3: String) : Stream<TestObject>
Run Code Online (Sandbox Code Playgroud)

JPA 查询如下所示:

    val cb = entityManager.criteriaBuilder
    val cq = cb.createQuery(TestObject::class.java)

    val rootEntry = cq.from(TestObject::class.java)

    val predicates = mutableListOf<Predicate>()

    predicates.add(cb.like(rootEntry.get<String>("value1"), value1))
    predicates.add(cb.equal(rootEntry.get<String>("value2"), value2))
    predicates.add(cb.equal(rootEntry.get<Int>("value3"), value3))

    val cqAllWhere = cq.select(rootEntry)
        .where(cb.or(*predicates.toTypedArray()))

    val …
Run Code Online (Sandbox Code Playgroud)

rest reactive-programming java-stream rx-java2 micronaut

2
推荐指数
1
解决办法
305
查看次数

Android Retrofit2/RxJava2/Room - 简单的数据处理

我正在开发应用程序,在应用程序启动时,我从 Rest 服务下载类别和帖子,以便将它们存储在 SQLite 数据库中。我有几个问题:

  1. 如何确定哪个对象是类别和哪个帖子?或者我如何才能访问它们?objects变量很奇怪。
  2. 我应该将使用 Room 库在数据库中插入项目的代码放在哪里?
  3. 我需要为每个帖子下载图片,我应该在哪里下载?

代码:

ItemsApi client = this.getClient(); // Retrofit2

List<Observable<?>> requests = new ArrayList<>();
requests.add(client.getCategories());
requests.add(client.getPosts());

Observable<Object> combined = Observable.zip(
        requests,
        new Function<Object[], Object>() {
            @Override
            public Object apply(Object[] objects) throws Exception {
                Timber.d("Length %s", objects.length); // Length 2
                Timber.d("objects.getClass() %s", objects.getClass()); // objects.getClass() class [Ljava.lang.Object;

                return new Object();
            }
        });

Disposable disposable = combined.subscribe(
        new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
               Timber.d("Object %s", o.toString());
            } …
Run Code Online (Sandbox Code Playgroud)

java android retrofit2 rx-java2 android-room

2
推荐指数
1
解决办法
349
查看次数

rxJava2 Single.Just() 总是在主线程中执行。如何让它在另一个线程上执行?

在这段代码摘录中,我试图处理一堆数据,但它不能在 UI 线程上,否则体验可能是 ANR。我认为用 rxJava2 很容易做到这一点,但是,数据处理总是在主线程上运行。

数据加载在“演示者”中触发,如下所示:

void loadHistoricalDataFromFile(String filename){
    view.showProgressDialog();
    addDisposable(
            model.loadHistoricalDataObservable(filename)
                    .subscribeOn(rxSchedulers.runOnBackground())
                    .observeOn(rxSchedulers.mainThread())
                    .subscribe(loadedSuccessfully -> {
                        view.hideProgressDialog();
                        if (loadedSuccessfully){
                            view.showSnackBar(R.string.simulator_loaded_data_success, LENGTH_SHORT);
                        } else {
                            view.showSnackBar(R.string.simulator_loaded_data_fail, LENGTH_INDEFINITE);
                        }
                    }));
}
Run Code Online (Sandbox Code Playgroud)

如您所见,我已经使用了 .subscribeOn(rxSchedulers.runOnBackground())

rxSchedulers.runOnBackground() 实现如下:

public class AppRxSchedulers implements RxSchedulers {


    public static Executor backgroundExecutor = Executors.newCachedThreadPool();
    public static Scheduler BACKGROUND_SCHEDULERS = Schedulers.from(backgroundExecutor);
    public static Executor internetExecutor = Executors.newCachedThreadPool();
    public static Scheduler INTERNET_SCHEDULERS = Schedulers.from(internetExecutor);
    public static Executor singleExecutor = Executors.newSingleThreadExecutor();
    public static Scheduler SINGLE_SCHEDULERS = Schedulers.from(singleExecutor);

    @Override …
Run Code Online (Sandbox Code Playgroud)

android rx-java2

2
推荐指数
1
解决办法
137
查看次数

RxJava2中Observer和DisposableObserver的区别

我知道DisposableObserver已经实现了ObserverDisposable。标记onSubscribe()方法为final并提供onStart()而不是它。

但我不明白这两者在行动上有什么区别。我什么时候应该使用Observeror DisposableObserver

能否请您谈谈使用它们的优缺点?

reactivex rx-java2

2
推荐指数
1
解决办法
550
查看次数