我是 RxJava2 的新手。
我有下一个代码:
fun signIn(): Completable = getCredentials() // get saved token
.onErrorResumeNext { makeLockSignInRequest() } // if token not saved then get it
.flatMap { refreshToken(it) } // refresh token
.doOnSuccess { credentialsManager.saveCredentials(it) } // save updated token
.doFinally { lock?.onDestroy(context) }!!
.toCompletable()
private fun getCredentials() = Single.create(SingleOnSubscribe<Credentials> {
credentialsManager.getCredentials(object : BaseCallback<Credentials, CredentialsManagerException> {
override fun onSuccess(payload: Credentials?) = it.onSuccess(payload!!)
override fun onFailure(error: CredentialsManagerException?) = it.onError(error!!)
})
})
private fun makeLockSignInRequest() = Single.create(SingleOnSubscribe<Credentials> {
lock = Lock.newBuilder(auth0, object …Run Code Online (Sandbox Code Playgroud) 我正在使用 MVVM 架构模式制作一个应用程序,我正在尝试使用 RxJava 在房间数据库中添加数据,但它在 lambda 表达式下显示红线:
lambda 表达式中的返回类型错误:void 无法转换为 Object。
下面是我的代码:
UserDao.java
@Dao
public interface UserDao {
@Insert
void insert(User user);
@Query("SELECT * FROM Users ORDER BY id DESC")
Flowable<List<User>> getAllUsers();
}
Run Code Online (Sandbox Code Playgroud)
用户存储库.java
public class UserRepository {
private UserDb userDb;
private UserDao userDao;
private Flowable<List<User>> allUsers;
public UserRepository(Application application) {
userDb = UserDb.getInstance(application);
userDao = userDb.userDao();
allUsers = userDao.getAllUsers();
}
public void insert(final User user){
Completable.fromCallable(() -> userDb.userDao().insert(user))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) { …Run Code Online (Sandbox Code Playgroud) 您好,我目前正在准备一个带有协程的 MVVM 的简单演示示例,我面临以下问题。请查看代码以及代码有什么问题。
E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.android.mvvmcoroutine.development, PID: 18974
java.lang.RuntimeException: Failed to invoke public io.reactivex.Observable() with no args
at com.google.gson.internal.ConstructorConstructor$3.construct(ConstructorConstructor.java:113)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:212)
at retrofit2.converter.gson.GsonResponseBodyConverter.convert(GsonResponseBodyConverter.java:39)
at retrofit2.converter.gson.GsonResponseBodyConverter.convert(GsonResponseBodyConverter.java:27)
at retrofit2.OkHttpCall.parseResponse(OkHttpCall.java:225)
at retrofit2.OkHttpCall$1.onResponse(OkHttpCall.java:121)
at okhttp3.RealCall$AsyncCall.run(RealCall.kt:138)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:919)
Caused by: java.lang.InstantiationException: Can't instantiate abstract class io.reactivex.Observable
at java.lang.reflect.Constructor.newInstance0(Native Method)
at java.lang.reflect.Constructor.newInstance(Constructor.java:343)
at com.google.gson.internal.ConstructorConstructor$3.construct(ConstructorConstructor.java:110)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:212)
at retrofit2.converter.gson.GsonResponseBodyConverter.convert(GsonResponseBodyConverter.java:39)
at retrofit2.converter.gson.GsonResponseBodyConverter.convert(GsonResponseBodyConverter.java:27)
at retrofit2.OkHttpCall.parseResponse(OkHttpCall.java:225)
at retrofit2.OkHttpCall$1.onResponse(OkHttpCall.java:121)
at okhttp3.RealCall$AsyncCall.run(RealCall.kt:138)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:919)
Run Code Online (Sandbox Code Playgroud)
依赖配置
'rxJavaVersion' : 'io.reactivex.rxjava2:rxjava:2.1.15'
'rxAndroidVersion' : 'io.reactivex.rxjava2:rxandroid:2.1.1' …Run Code Online (Sandbox Code Playgroud) 我将 Retrofit 与 Rxjava 一起使用来向服务器发出请求。
我的服务器返回定义的 json 格式,其中包括数据和定义的消息。
服务器返回http响应。如果服务器返回成功代码(200)就可以了。
但我希望,如果服务器返回其他代码,我将管理该响应的正文。
例如:
服务器返回401,我想读取响应正文以显示服务器的消息。
但是当服务器其他代码时,改造调用 onError 方法,我无法使用响应正文。
如何解决这个问题?
这是我的方法
'''
private void login(String username , String password){
view.setLoading();
source.loginUser(username, password)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<Response<LoginResult>>() {
@Override
public void onSubscribe(Disposable d) {
disposable.add(d);
}
@Override
public void onSuccess(Response<LoginResult> loginResult) {
if (loginResult.isSuccessful()){
}
else
new AlertConfiguration(view.getViewActivity()).showMessage(loginResult.body().getMessage());
}
@Override
public void onError(Throwable e) {
if there is a problem
}
});
Run Code Online (Sandbox Code Playgroud)
'''
这是我改造的接口方法
@POST("...")
Single<Response<LoginResult>> loginUser(@Query("username") String username, @Query("password") String password);
Run Code Online (Sandbox Code Playgroud) 我在使用 RxJava 时遇到了一些麻烦。我正在使用 Kotlin 编码。这是我的问题:
我有一份单身人士名单。现在我需要所有 Singles 的发出结果才能继续。如果单打比赛能够并行进行并且结果保持相同的顺序,那就太好了。当所有单身人士都公布了他们的结果时,我想继续。
val list_of_singles = mutableListOf<Single<Type>>()
val results: List<ResultType> = runSingles(list_of_singles)
// use results here...
Run Code Online (Sandbox Code Playgroud)
如果您需要更多信息,请与我们联系。
谢谢!!!:)
有人可以帮忙吗?
我有这些功能
fun getBooks(): Single<List<Book>> {
return getCollections()
.map {
it.map(::collectonToBook)
}
}
fun getCollections(): Single<List<Collection>> {
return db.fetchCollections()
.filter(::isBook)
}
fun collectonToBook(collection: Collection): Maybe<Book> {
return collection.toBook()
}
Run Code Online (Sandbox Code Playgroud)
问题是 getBooksSingle<List<Maybe<Book>>>在我需要时返回Single<List<Book>>。我可以在流中执行此操作而不调用blockingGet吗?
我是 vertx 和 RxJava 的新手。我正在尝试实现一个简单的测试程序。但是,我无法理解这个程序的动态。为什么有些请求需要 10 秒以上才能响应?
以下是我的示例测试应用程序
public class Test {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
WebClient webClient = WebClient.create(vertx);
Observable < Object > google = hitURL("www.google.com", webClient);
Observable < Object > yahoo = hitURL("www.yahoo.com", webClient);
for (int i = 0; i < 100; i++) {
google.repeat(100).subscribe(timeTaken -> {
if ((Long) timeTaken > 10000) {
System.out.println(timeTaken);
}
}, error -> {
System.out.println(error.getMessage());
});
yahoo.repeat(100).subscribe(timeTaken -> {
if ((Long) timeTaken > 10000) {
System.out.println(timeTaken);
} …Run Code Online (Sandbox Code Playgroud) 我正在改编来自三词地址的一些示例代码,以便通过 Java SDK 访问他们的 API。它使用 RXJava。
示例代码是:
Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> {
if (result.isSuccessful()) {
Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
} else {
Log.e("MainActivity", result.getError().getMessage());
}
});
Run Code Online (Sandbox Code Playgroud)
首先。这会在构建时给出弃用警告以及 IDE 警告 ( Result of 'Observable.subscribe()' is ignored)。
为了解决第一个问题,我Disposable myDisposable = 在Observable. 它是否正确?(添加位置见下文)
接下来,我需要添加超时,以便在请求超时时可以显示警告等。为此,我已添加.timeout(5000, TimeUnit.MILLISECONDS)到构建器中。
timeout这是可行的,但s 似乎对 s 起作用的方式Observable是它们抛出异常,而我不知道如何捕获和处理该异常。
我现在拥有的是:
Disposable myDisposable = Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.timeout(5000, TimeUnit.MILLISECONDS)
.subscribe(result -> {
if (result.isSuccessful()) …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Retrofit 2创建Weather应用程序,现在我很难正确设置呼叫.
这是正在运行的URL:
http://api.openweathermap.org/data/2.5/weather?q=London&APPID=MY_API_KEY
Run Code Online (Sandbox Code Playgroud)
所以,我有我的API密钥和BASE URL是:http://api.openweathermap.org ..这是我的Retrofit服务中的方法:
@GET("/data/2.5/weather?q={city}/&APPID={api}")
Observable<WeatherResponse> getWeather(@Path("city") String city, @Path("api") String api);
Run Code Online (Sandbox Code Playgroud)
而我得到的错误是:
java.lang.IllegalArgumentException:URL查询字符串"q = {city} /&APPID = {api}"必须没有替换块.对于动态查询参数,请使用@Query.
所以我试着这样:
@GET("/data/2.5/weather?{city}/&APPID={api}")
Observable<WeatherResponse> getWeather(@Query("city") String city, @Path("api") String api);
Run Code Online (Sandbox Code Playgroud)
我得到同样的错误......任何人都知道这里的交易是什么,我的网址有什么问题?
这是我的代码段。
@Override
public void onDestroy() {
super.onDestroy();
disposableSingleObserver.dispose();
}
/**
* fetches json by making http calls
*/
private void fetchContacts() {
disposableSingleObserver = new DisposableSingleObserver<List<Contact>>() {
@Override
public void onSuccess(List<Contact> movies) {
//Toast.makeText(getActivity(), "Success", Toast.LENGTH_SHORT).show();
contactList.clear();
contactList.addAll(movies);
mAdapter.notifyDataSetChanged();
// Received all notes
}
@Override
public void onError(Throwable e) {
// Network error
}
};
// Fetching all movies
apiService.getContacts()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(disposableSingleObserver);
}
Run Code Online (Sandbox Code Playgroud)
我收到警告,未使用subscribeWith的结果。
解决此问题的正确方法是什么?