我正在尝试使用 kotlin 学习适用于 Android 的 RxJava2,并且正在遵循这个很好的在线教程。首先,我在 gradle.build 文件中添加了这两行:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.3'
Run Code Online (Sandbox Code Playgroud)
我尝试用以下代码实现 Observable 模式:
import io.reactivex.Observable
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val observable = Observable.from(arrayOf(1, 2, 3, 4, 5, 6))
}
}
Run Code Online (Sandbox Code Playgroud)
这应该很容易工作,但我无法在 Observable 上调用 from() 运算符(未解析的引用:from)。所以基本上我在开始之前就陷入了困境。有人知道我做错了什么吗?
我正在尝试测试订阅冷 observable 的部分代码。我的问题是单元测试在断言结果之前没有等待内部可观察调用结束,因此我的测试失败。
这是我要测试的代码:
public class UserManager {
private UserRepository userRepository;
private PublishSubject<List<User>> usersSubject = PublishSubject.create();
public UserManager(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Observable<List<User>> users() {
return usersSubject;
}
public void onUserIdsReceived(List<String> userIds) {
userRepository.getUsersInfo(userIds)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<List<User>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {}
@Override
public void onSuccess(@NonNull List<User> users) {
usersSubject.onNext(users);
}
@Override
public void onError(Throwable e) {}
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的测试:
@RunWith(MockitoJUnitRunner::class)
class UserManagerTest {
private val userRepository = mock(UserRepository::class.java)
companion object {
@BeforeClass …Run Code Online (Sandbox Code Playgroud) 我的 TeamDao 中有这个
@Query("select * from teams where id = :teamId")
Flowable<Team> getRivals(int teamId);
Run Code Online (Sandbox Code Playgroud)
我像这样订阅这个
teamRepository.getRivals(61).
distinctUntilChanged().
observeOn(SchedulerProvider.getInstance().ui()).
subscribeOn(SchedulerProvider.getInstance().computation())
.subscribe(team -> Log.d("HomeActivity", team.getName()));
Run Code Online (Sandbox Code Playgroud)
每当 Team 表的任何行发生任何更改时,无论它是否带有 id 61 ,我都会看到我的 this 订阅者正在调用。
我在博客上读到 distinctUntilChanged() 正是用来避免这种情况的。
我的团队POJO是这样的
@Entity(tableName = "teams",
foreignKeys = {
@ForeignKey
(entity = User.class,
parentColumns = "id",
childColumns = "userId",
onDelete = ForeignKey.CASCADE),
@ForeignKey
(entity = Neighbourhood.class,
parentColumns = "id",
childColumns = "neighbourhoodId"
)})
public class Team {
@PrimaryKey
int id;
private String name;
private String imageUrl;
@Embedded(prefix = …Run Code Online (Sandbox Code Playgroud) 从数据源搜索项目时,我有以下 UI 流程:
Outcome.loading(true)Outcome.success(results)Outcome.loading(false)现在的问题是当应用程序在后台时调用 #2 和 #3。恢复应用程序,LiveData 观察者只会收到 #3 而不是 #2 导致未填充 RecyclerView 的通知。
处理这种情况的正确方法是什么?
class SearchViewModel @Inject constructor(
private val dataSource: MusicInfoRepositoryInterface,
private val scheduler: Scheduler,
private val disposables: CompositeDisposable) : ViewModel() {
private val searchOutcome = MutableLiveData<Outcome<List<MusicInfo>>>()
val searchOutcomLiveData: LiveData<Outcome<List<MusicInfo>>>
get() = searchOutcome
fun search(searchText: String) {
Timber.d(".loadMusicInfos")
if(searchText.isBlank()) {
return
}
dataSource.search(searchText)
.observeOn(scheduler.mainThread())
.startWith(Outcome.loading(true))
.onErrorReturn { throwable -> Outcome.failure(throwable) }
.doOnTerminate { searchOutcome.value …Run Code Online (Sandbox Code Playgroud) 我在 RxJava 中有一个函数,它试图根据条件找到一个事物,如果成功,它会转换并将其作为Observable. Observable.empty()如果找不到任何东西,我想返回,因此没有第一个。我不确定这一点,但我认为如果 filter 过滤掉 中的每个元素,Observable结果将是Observable.empty()无论如何(没有firstElement())。
void Observable<Thing> transformFirst(Observable<Thing> things,Predicate<Thing> condition){
return things
.filter(condition)
.firstElement()
.map(firstThing ->{...do sg...})
}
Run Code Online (Sandbox Code Playgroud)
编辑:
我的问题是firstElement()回报Maybe<Thing>,我不知道如何把它转换成一个Observable.empty()当filter(condition)过滤掉一切(conditionevaulates为false,每thing)
我想将列表中的每个对象都转换为另一个对象。但是这样做我的代码卡在将它们转换回 List
override fun myFunction(): LiveData<MutableList<MyModel>> {
return mySdk
.getAllElements() // Returns Flowable<List<CustomObject>>
.flatMap { Flowable.fromIterable(it) }
.map {MyModel(it.name!!, it.phoneNumber!!) }
.toList() //Debugger does not enter here
.toFlowable()
.onErrorReturn { Collections.emptyList() }
.subscribeOn(Schedulers.io())
.to { LiveDataReactiveStreams.fromPublisher(it) }
}
Run Code Online (Sandbox Code Playgroud)
一切都很好,直到映射。但调试器甚至不会停留在 toList 或 toList 之下的任何其他地方。我该如何解决这个问题?
我目前开始使用 micronaut 和 kotlin。我有 JPA 查询,结果大约有 100 万个。这些结果我想从这个 micronaut 服务流到另一个。
我的查询返回 aallQuery.resultStream类型java.util.stream。
发送服务的控制器:
@Get("/test{value1,value2,value3}")
fun getTestObjects(
value1: String,
value2: String,
value3: String
): Stream<TestObject> {
val entries = testRepository.findAllWhere(value1, value2, value3)
return entries
}
Run Code Online (Sandbox Code Playgroud)
接收服务的客户:
@Get("/data/test{value1,value2,value3}")
override fun getTestObjects(alue1: String,
value2: String,
value3: String) : Stream<TestObject>
Run Code Online (Sandbox Code Playgroud)
JPA 查询如下所示:
val cb = entityManager.criteriaBuilder
val cq = cb.createQuery(TestObject::class.java)
val rootEntry = cq.from(TestObject::class.java)
val predicates = mutableListOf<Predicate>()
predicates.add(cb.like(rootEntry.get<String>("value1"), value1))
predicates.add(cb.equal(rootEntry.get<String>("value2"), value2))
predicates.add(cb.equal(rootEntry.get<Int>("value3"), value3))
val cqAllWhere = cq.select(rootEntry)
.where(cb.or(*predicates.toTypedArray()))
val …Run Code Online (Sandbox Code Playgroud) 我正在开发应用程序,在应用程序启动时,我从 Rest 服务下载类别和帖子,以便将它们存储在 SQLite 数据库中。我有几个问题:
objects变量很奇怪。代码:
ItemsApi client = this.getClient(); // Retrofit2
List<Observable<?>> requests = new ArrayList<>();
requests.add(client.getCategories());
requests.add(client.getPosts());
Observable<Object> combined = Observable.zip(
requests,
new Function<Object[], Object>() {
@Override
public Object apply(Object[] objects) throws Exception {
Timber.d("Length %s", objects.length); // Length 2
Timber.d("objects.getClass() %s", objects.getClass()); // objects.getClass() class [Ljava.lang.Object;
return new Object();
}
});
Disposable disposable = combined.subscribe(
new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Timber.d("Object %s", o.toString());
} …Run Code Online (Sandbox Code Playgroud) 在这段代码摘录中,我试图处理一堆数据,但它不能在 UI 线程上,否则体验可能是 ANR。我认为用 rxJava2 很容易做到这一点,但是,数据处理总是在主线程上运行。
数据加载在“演示者”中触发,如下所示:
void loadHistoricalDataFromFile(String filename){
view.showProgressDialog();
addDisposable(
model.loadHistoricalDataObservable(filename)
.subscribeOn(rxSchedulers.runOnBackground())
.observeOn(rxSchedulers.mainThread())
.subscribe(loadedSuccessfully -> {
view.hideProgressDialog();
if (loadedSuccessfully){
view.showSnackBar(R.string.simulator_loaded_data_success, LENGTH_SHORT);
} else {
view.showSnackBar(R.string.simulator_loaded_data_fail, LENGTH_INDEFINITE);
}
}));
}
Run Code Online (Sandbox Code Playgroud)
如您所见,我已经使用了 .subscribeOn(rxSchedulers.runOnBackground())
rxSchedulers.runOnBackground() 实现如下:
public class AppRxSchedulers implements RxSchedulers {
public static Executor backgroundExecutor = Executors.newCachedThreadPool();
public static Scheduler BACKGROUND_SCHEDULERS = Schedulers.from(backgroundExecutor);
public static Executor internetExecutor = Executors.newCachedThreadPool();
public static Scheduler INTERNET_SCHEDULERS = Schedulers.from(internetExecutor);
public static Executor singleExecutor = Executors.newSingleThreadExecutor();
public static Scheduler SINGLE_SCHEDULERS = Schedulers.from(singleExecutor);
@Override …Run Code Online (Sandbox Code Playgroud) 我知道DisposableObserver已经实现了Observer和Disposable。标记onSubscribe()方法为final并提供onStart()而不是它。
但我不明白这两者在行动上有什么区别。我什么时候应该使用Observeror DisposableObserver?
能否请您谈谈使用它们的优缺点?
rx-java2 ×10
android ×7
kotlin ×3
rx-java ×3
android-room ×2
reactivex ×2
android-mvvm ×1
java ×1
java-stream ×1
micronaut ×1
observable ×1
rest ×1
retrofit2 ×1
rx-android ×1
unit-testing ×1