我不清楚如何在RXJava中实现任务取消.
我对移植使用Guava构建的现有API很感兴趣ListenableFuture.我的用例如下:
Futures.transform()1- > 2- > 3,取消3传播到2,等等.关于这一点,RxJava维基中的信息很少; 我可以找到的唯一引用取消提及Subscription相当于.NET的Disposable,但据我所知,Subscription仅提供取消订阅序列中后续值的功能.
我不清楚如何通过这个API实现"任何订阅者可以取消"语义.我是以错误的方式思考这个问题吗?
任何输入将不胜感激.
我有这样的任务:
Observable.just(getMessagesFromDb()).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(incomingMessages -> {
//do something
});
Run Code Online (Sandbox Code Playgroud)
getMessagesFromDb方法在哪里同步获取消息,而不在内部进行多线程处理.根据subscribeOn方法的RxAndroid文档:
在指定的Scheduler上异步订阅Observvers到此Observable
还有我的问题 - 为什么数据库请求在主线程上执行?如何异步进行?
在我们的应用程序中,我使用此代码下载图像文件.我需要在UI上显示下载进度(以百分比形式下载的字节数).我如何在此代码中获得下载进度?我搜索了解决方案,但仍无法自行完成.
Observable<String> downloadObservable = Observable.create(
sub -> {
Request request = new Request.Builder()
.url(media.getMediaUrl())
.build();
Response response = null;
try {
response = http_client.newCall(request).execute();
if (response.isSuccessful()) {
Log.d(TAG, "response.isSuccessful()");
String mimeType = MimeTypeMap.getFileExtensionFromUrl(media.getMediaUrl());
File file = new File(helper.getTmpFolder() + "/" + helper.generateUniqueName() + "test." + mimeType);
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeAll(response.body().source());
sink.close();
sub.onNext(response.toString());
sub.onCompleted();
} else {
sub.onError(new IOException());
}
} catch (IOException e) {
e.printStackTrace();
}
}
);
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void …Run Code Online (Sandbox Code Playgroud) 我正在执行一些测试来评估使用基于Observables的反应API是否真的有优势,而不是阻止传统的API.
整个例子可以在Githug上找到
令人惊讶的是,结果表明,输出结果是:
最好的:返回包含阻塞操作的Callable/的REST服务DeferredResult.
还不错:阻止REST服务.
最糟糕的:返回DeferredResult的REST服务,其结果由RxJava Observable设置.
这是我的Spring WebApp:
申请:
@SpringBootApplication
public class SpringNioRestApplication {
@Bean
public ThreadPoolTaskExecutor executor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
return executor;
}
public static void main(String[] args) {
SpringApplication.run(SpringNioRestApplication.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
SyncController:
@RestController("SyncRestController")
@Api(value="", description="Synchronous data controller")
public class SyncRestController {
@Autowired
private DataService dataService;
@RequestMapping(value="/sync/data", method=RequestMethod.GET, produces="application/json")
@ApiOperation(value = "Gets data", notes="Gets data synchronously")
@ApiResponses(value={@ApiResponse(code=200, message="OK")})
public List<Data> getData(){ …Run Code Online (Sandbox Code Playgroud) 我有这样的观察
Observable.zip(observable, extObs, new Func2<List<UserProfile>, ArrayList<Extension>, UserProfile>() {
@Override
public UserProfile call(List<UserProfile> userProfiles, ArrayList<Extension> extensions) {
return userProfiles.get(0);
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).unsubscribeOn(Schedulers.io()).subscribe(new Subscriber<UserProfile>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(UserProfile userProfile) {
profileListener.onProfileSet(userProfile);
}
});
}
Run Code Online (Sandbox Code Playgroud)
我需要通过ArrayList中的方法profileListener.onProfileSet(userProfile);为profileListener.onProfileSet(userProfile,extensions);
是否可以在zip中执行此操作,还是有任何其他rxjava方法来解决此类问题?
我正在尝试按照https://inthecheesefactory.com/blog/retrofit-2.0/en中的指南使用Retrofit 2和RxJava
在"与CallAdapter的RxJava集成"一节中解释了如何使用RxJava进行改造
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://api.nuuneoi.com/base/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
Run Code Online (Sandbox Code Playgroud)
但是在编译期间存在以下错误:
Error:(63, 71) error: incompatible types: RxJavaCallAdapterFactory cannot be converted to Factory
Run Code Online (Sandbox Code Playgroud)
我该如何解决?谢谢
我正在寻找一个可以的主题(或类似的东西):
BehaviorSubject 几乎可以完成这项工作,但它保留了最后观察到的项目.
UPDATE
基于已接受的答案,我为单个观察项目制定了类似的解决方案.还添加了取消订阅部分以避免内存泄漏.
class LastEventObservable private constructor(
private val onSubscribe: OnSubscribe<Any>,
private val state: State
) : Observable<Any>(onSubscribe) {
fun emit(value: Any) {
if (state.subscriber.hasObservers()) {
state.subscriber.onNext(value)
} else {
state.lastItem = value
}
}
companion object {
fun create(): LastEventObservable {
val state = State()
val onSubscribe = OnSubscribe<Any> { subscriber ->
just(state.lastItem)
.filter { it != null }
.doOnNext { subscriber.onNext(it) }
.doOnCompleted { state.lastItem = null }
.subscribe()
val subscription = state.subscriber.subscribe(subscriber) …Run Code Online (Sandbox Code Playgroud) 我正在玩rxJava/rxAndroid,并且有一些非常基本的东西不像我期望的那样.我有一个可观察的和两个订阅者:
Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));
Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
Run Code Online (Sandbox Code Playgroud)
这是输出:
D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
Run Code Online (Sandbox Code Playgroud)
现在,我知道我可以通过使用避免重复计数,publish().autoConnect()但我试图首先理解这种默认行为.每当有人订阅observable时,它就会开始发出数字序列.我明白了.因此,当Subscriber 1连接时它开始发射物品.Subscriber 2马上连接,为什么不能获得价值呢?
这是我理解它的方式,从可观察的角度来看:
有人订阅了我,我应该开始发出物品
[订阅者:1] [项目到EMIT:1,2,3]
向订户发出项目"1"
[订阅者:1] [项目到EMIT:2,3]
有人订阅了我,当我完成后我将再次发出1,2,3
[订阅者:1&2] [项目到EMIT:2,3,1,2,3]
向订户发出项目'2'
[订阅者:1&2] [项目到EMIT:3,1,2,3]
向订户发出项目'3'
[订阅者:1&2] …
我在我的Android应用程序中使用RxJava和Retrofit向服务器发出网络请求.我正在使用RxJavaCallAdapterFactory,所以我可以让我的改装请求返回单打.在我的代码中,改造对象被命名为'api'.
这里的代码工作正常,但在这个例子中,我需要在制作播放列表之前检索userId.我将userId请求平面映射到API请求,在制作播放列表后,我需要再次使用平面地图将JSON响应转换为可用对象.
public JSONUser me;
public Single<String> getUserId(){
if(me != null){
return Single.just(me.getUserId());
}
return api.getMe().flatMap(new Func1<JSONUser, Single<String>>() {
@Override
public Single<String> call(JSONUser meResult) {
me = meResult;
return Single.just(me.getUserId());
}
});
}
public Single<Playlist> createPlaylist(String name) {
final NewPlaylistConfig config = new NewPlaylistConfig(name);
return getUserId().flatMap(new Func1<String, Single<Playlist>>() {
@Override
public Single<Playlist> call(String userId) {
return api.createPlaylist(userId, config).flatMap(
new Func1<JSONPlaylist, Single<? extends SpotifyPlaylist>>() {
@Override
public Single<? extends Playlist> call(JSONPlaylist data) {
return Single.just(new Playlist(data));
}
});
}
});
} …Run Code Online (Sandbox Code Playgroud) 我像这样初始化我的变量: -
val user: BehaviorSubject<User?> user = BehaviorSubject.create()
Run Code Online (Sandbox Code Playgroud)
但我不能这样做.IDE抛出错误: -
user.onNext(null)
Run Code Online (Sandbox Code Playgroud)
这样做,IDE说你永远不会为空: -
user.filter( u -> u!=null)
Run Code Online (Sandbox Code Playgroud)