流结束时我需要执行一些操作。这样做的惯用方法是什么?
现在我使用下面的代码:
source.subscribe(undefined, undefined, function() {
socket.send({type: 'end'});
});
Run Code Online (Sandbox Code Playgroud) 背景
我正在编写一些执行以下操作的软件:
当我第一次单击“开始”时,一切正常,但之后就不行了。第一次单击开始时,我得到如下输出:
这看起来不错,没有什么问题。但是,当我第二次单击“开始”时,我得到以下输出。
现在,我相信我知道为什么会发生这种情况。据我所知,从我第一次单击“开始”开始,我的观察者就从未取消订阅,因此所有内容都会打印两次。单击开始按钮时,会发生以下情况:
/// <summary>
/// Starts the test.
/// </summary>
/// <param name="sender">The "start" button.</param>
/// <param name="e">Clicking on the "start" button.</param>
private void button_go_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
var results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
handler => (s, a) => handler(s, a),
handler => this.myTest.Results += handler,
handler => this.myTest.Results -= handler)
.ObserveOn(uiContext)
.Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
// start running the test
this.runningTest = new Task(() => …Run Code Online (Sandbox Code Playgroud) 我\xc2\xb4m 寻找运算符 SkipUntil,但似乎没有按我的预期工作。\n这是我的代码
\n\n@Test\npublic void testSkiUitil() throws InterruptedException {\n List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);\n Observable observable2 = Observable.just(1);\n Subscription subscription = Observable.from(numbers)\n .skipUntil(observable2)\n .subscribe(System.out::println);\n Thread.sleep(3000);\n observable2.subscribe();\n new TestSubscriber((Observer) subscription).awaitTerminalEvent(5, TimeUnit.SECONDS);\n\n}\nRun Code Online (Sandbox Code Playgroud)\n\n我试图证明,由于 observable2 没有任何订阅,因此不会发出任何项目,因此使用操作符skipUntil 的第一个 observable 应该跳过所有项目。但仍然发出所有 5 项。
\n\n知道为什么吗?
\n\n医生说。
\n\n Returns an Observable that skips items emitted by the source Observable until a second Observable emits\nRun Code Online (Sandbox Code Playgroud)\n 我有一个从 FirebaseAuthService获取Observable<auth>数据并将其放入公共变量中的方法auth$。
在我的应用程序中的某个位置,我需要多个组件来访问该auth$值以检查其内容。
我目前只是this.authService.auth$.subscribe(auth => this.auth = auth)在组件中做。
这是否意味着我要执行fetch data from Firebase多次?
如果是,我应该将 auth$ 定义为Subject/BehaviorSubject,订阅服务并使用 推送数据吗.next?
如何使用 RxAlamofire 发送多部分数据
例如在阿拉莫菲尔
let URL = try! URLRequest(url: "http://example.com", method: .post)
Alamofire.upload(multipartFormData: { formData in
// multiaprt
}, with: URL, encodingCompletion: {(result: SessionManager.MultipartFormDataEncodingResult) in
})
Run Code Online (Sandbox Code Playgroud)
我正在使用 monix 任务,并且尝试捕获 Throwable,然后转换为自定义错误。我已删除/更改代码以使其简单且相关。这是代码(问题在代码片段后面):
import io.netty.handler.codec.http.HttpRequest
import monix.reactive.Observable
import io.netty.buffer.ByteBuf
import monix.eval.Task
import com.mypackage.Response
private[this] def handler(
request: HttpRequest,
body: Observable[ByteBuf]
): Task[Response] = {
val localPackage = for {
failfast <- Task.eval(1 / 0)
} yield failfast
// Failure case.
localPackage.onErrorRecoverWith {
case ex: ArithmeticException =>
print(s"LOG HERE^^^^^^^^^^^^^^^")
return Task.now(
Response(HttpResponseStatus.BAD_REQUEST,
None,
None)
)
}.runAsync
// Success case.
localPackage.map { x =>
x match {
case Right(cool) =>
Response(
HttpResponseStatus.OK,
None,
cool
)
case Left(doesntmatter) => ???
}
} …Run Code Online (Sandbox Code Playgroud) 我的程序服务提供了一些返回的删除方法Mono<Void>,例如:fun delete(clientId: String) : Mono<Void>
调用后,.delete("x")我想向下游传播 clientId 以执行其他操作:
userService.get(id).map{ user ->
userService.delete(user.id) //This returns Mono<Void>
.map {
user.id //Never called!!!
}
.map { userId ->
//other calls using the propagated userId
}
}
Run Code Online (Sandbox Code Playgroud)
问题是因为 delete 返回 a Mono<Void>,所以.map {
user.id }永远不会调用以下内容。那么如何将 theMono<Void>转换为 aMono<String>来传播 userId?
我有一个Flux由专用Processor含义生成的无限实例,每个元素都是通过发出的sink.next(Receiver如果重要的话,元素来自反应式 Kafka )。问题是,每次我尝试做一些有用的事情时sort,Flux它只会给出一个空的结果。这也适用于reduce。
难道我做错了什么?
编辑
这是一个更具体的例子,它给出了一个空的Flux:
Flux.<Integer>create(sink -> {
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
})
.sort() // If I remove this everthing works as expected
.log()
.subscribe();
Run Code Online (Sandbox Code Playgroud)
谈到我的具体情况,这是我所拥有的简化版本:
FluxProcessor<Message, Message> processor = ReplayProcessor.<Message>createTimeout(Duration.ofDays(1)).serialize();
FluxSink<Message> sink = processor.sink();
Flux<Message> pipeline = processor;
kafka.receive()
.log()
.map(ReceiverRecord::value)
.subscribe(sink::next);
return pipeline; // Work with the pipeline later on
Run Code Online (Sandbox Code Playgroud)
然后,如果我尝试无论是.sort或.reduce将pipeline它总是导致空Flux。
我使用 Spring WebFlux(Project Reactor),但面临以下问题:我必须从 db 获取一些数据才能使用它们来调用另一项服务 - 一个流中的所有内容。怎么做?
public Mono<MyObj> saveObj(Mono<MyObj> obj) {
return obj
.flatMap(
ob->
Mono.zip(
repo1.save(
...),
repo2
.saveAll(...)
.collectList(),
repo3
.saveAll(...)
.collectList())
.map(this::createSpecificObject))
.doOnNext(item-> createObjAndCallAnotherService(item));
}
private void createObjAndCallAnotherService(Prot prot){
myRepository
.findById(
prot.getDomCred().stream()
.filter(Objects::nonNull)
.findFirst()
.map(ConfDomCred::getCredId)
.orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")))
.doOnNext( //one value is returned from myRepository -> Flux<MyObjectWithNeededData>
confCred-> {//from this point the code is unreachable!!! - why????
Optional<ConfDomCred> confDomCred=
prot.getDomCreds().stream().filter(Objects::nonNull).findFirst();
confDomCred.ifPresent(
domCred -> {
ProtComDto com=
ProtComDto.builder()
.userName(confCred.getUsername())
.password(confCred.getPassword())
.build();
clientApiToAnotherService.callEndpintInAnotherService(com); //this is a client like Feign …Run Code Online (Sandbox Code Playgroud) 我正在使用 Angular 11,并且我正在使用异步管道访问组件模板中的一个 observable 。
路线的第一次加载,一切正常。没有错误。当我离开页面并返回时,出现以下错误:
组件模板:<RM-map *ngIf="(layers$ | async) as layer" [layers]="layers.layerConfig" showLayersPanel="true" id="RIS-map">
成分
import { Component, OnDestroy, OnInit } from '@angular/core';
import { Observable, Subject } from 'rxjs';
import { FullMapViewService } from '../services/full-map-view.service';
import { RISLayerConfigResponse } from '@RM/interfaces';
import { takeUntil } from 'rxjs/operators';
@Component({
selector: 'RM-full-map-view',
templateUrl: './full-map-view.component.html',
styleUrls: ['./full-map-view.component.scss']
})
export class FullMapViewComponent implements OnInit, OnDestroy {
layers$: Observable<RISLayerConfigResponse>;
destroyed$: Subject<boolean> = new Subject();
constructor(private fullMapViewService: FullMapViewService) {}
ngOnInit(): void { …Run Code Online (Sandbox Code Playgroud) javascript reactive-programming rxjs angular rxjs-observables