标签: reactive-programming

使用 RxJS 处理流结束的惯用方法

流结束时我需要执行一些操作。这样做的惯用方法是什么?

现在我使用下面的代码:

source.subscribe(undefined, undefined, function() {
  socket.send({type: 'end'});
});
Run Code Online (Sandbox Code Playgroud)

reactive-programming system.reactive rxjs

2
推荐指数
1
解决办法
1716
查看次数

无法取消订阅 Rx

背景

我正在编写一些执行以下操作的软件:

  1. 用户单击“开始”。
  2. 启动一个任务来执行一些工作并启动事件来更新 GUI。
  3. 可观察对象使用任务中的事件,并将数据打印到 GUI 中的富文本框。

当我第一次单击“开始”时,一切正常,但之后就不行了。第一次单击开始时,我得到如下输出:

单击开始一次

这看起来不错,没有什么问题。但是,当我第二次单击“开始”时,我得到以下输出。

再次点击开始

现在,我相信我知道为什么会发生这种情况。据我所知,从我第一次单击“开始”开始,我的观察者就从未取消订阅,因此所有内容都会打印两次。单击开始按钮时,会发生以下情况:

    /// <summary>
    /// Starts the test.
    /// </summary>
    /// <param name="sender">The "start" button.</param>
    /// <param name="e">Clicking on the "start" button.</param>
    private void button_go_Click(object sender, RoutedEventArgs e)
    {
        var uiContext = SynchronizationContext.Current;
        var results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
            handler => (s, a) => handler(s, a),
            handler => this.myTest.Results += handler,
            handler => this.myTest.Results -= handler)
            .ObserveOn(uiContext)
            .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

        // start running the test
        this.runningTest = new Task(() => …
Run Code Online (Sandbox Code Playgroud)

c# task reactive-programming winforms system.reactive

2
推荐指数
1
解决办法
1181
查看次数

SkipUntil 无法按预期工作

我\xc2\xb4m 寻找运算符 SkipUntil,但似乎没有按我的预期工作。\n这是我的代码

\n\n
@Test\npublic void testSkiUitil() throws InterruptedException {\n    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);\n    Observable observable2 = Observable.just(1);\n    Subscription subscription = Observable.from(numbers)\n                                          .skipUntil(observable2)\n                                          .subscribe(System.out::println);\n    Thread.sleep(3000);\n    observable2.subscribe();\n    new TestSubscriber((Observer) subscription).awaitTerminalEvent(5, TimeUnit.SECONDS);\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

我试图证明,由于 observable2 没有任何订阅,因此不会发出任何项目,因此使用操作符skipUntil 的第一个 observable 应该跳过所有项目。但仍然发出所有 5 项。

\n\n

知道为什么吗?

\n\n

医生说。

\n\n
   Returns an Observable that skips items emitted by the source Observable until a second Observable emits\n
Run Code Online (Sandbox Code Playgroud)\n

reactive-programming observable rx-java reactivex

2
推荐指数
1
解决办法
1164
查看次数

在组件之间共享 Auth Observable 值,无需多次执行

我有一个从 FirebaseAuthService获取Observable<auth>数据并将其放入公共变量中的方法auth$

在我的应用程序中的某个位置,我需要多个组件来访问该auth$值以检查其内容。

我目前只是this.authService.auth$.subscribe(auth => this.auth = auth)在组件中做。

  1. 这是否意味着我要执行fetch data from Firebase多次?

  2. 如果是,我应该将 auth$ 定义为Subject/BehaviorSubject,订阅服务并使用 推送数据吗.next

subject reactive-programming observable rxjs angular

2
推荐指数
1
解决办法
918
查看次数

RxAlamofire发布多部分数据

如何使用 RxAlamofire 发送多部分数据

例如在阿拉莫菲尔

let URL = try! URLRequest(url: "http://example.com", method: .post)


Alamofire.upload(multipartFormData: { formData in
    // multiaprt
}, with: URL, encodingCompletion: {(result: SessionManager.MultipartFormDataEncodingResult) in

})
Run Code Online (Sandbox Code Playgroud)
  1. 如何使用RxAlamofire获得相同的行为/功能?
  2. 或者将此函数包装在 Observable 中的方法?

reactive-programming alamofire rx-swift swift3 rxalamofire

2
推荐指数
1
解决办法
2627
查看次数

如何在 monix onErrorHandle 中处理未处理的异常抛出

我正在使用 monix 任务,并且尝试捕获 Throwable,然后转换为自定义错误。我已删除/更改代码以使其简单且相关。这是代码(问题在代码片段后面):

import io.netty.handler.codec.http.HttpRequest
import monix.reactive.Observable
import io.netty.buffer.ByteBuf
import monix.eval.Task
import com.mypackage.Response


private[this] def handler(
      request: HttpRequest,
      body: Observable[ByteBuf]
  ): Task[Response] = {

    val localPackage = for {
      failfast <- Task.eval(1 / 0)
    } yield failfast

    // Failure case.
    localPackage.onErrorRecoverWith {
        case ex: ArithmeticException =>
          print(s"LOG HERE^^^^^^^^^^^^^^^")
          return Task.now(
            Response(HttpResponseStatus.BAD_REQUEST,
                     None,
                     None)
          )
    }.runAsync

    // Success case.
    localPackage.map { x => 
       x match {
        case Right(cool) =>
          Response(
            HttpResponseStatus.OK,
            None,
            cool
          )
        case Left(doesntmatter) => ???
      }
  } …
Run Code Online (Sandbox Code Playgroud)

scala reactive-programming monix

2
推荐指数
1
解决办法
1465
查看次数

在 Mono&lt;String&gt; 中转换 Mono&lt;Void&gt; 添加一个值

我的程序服务提供了一些返回的删除方法Mono<Void>,例如:fun delete(clientId: String) : Mono<Void>

调用后,.delete("x")我想向下游传播 clientId 以执行其他操作:

userService.get(id).map{ user -> 
        userService.delete(user.id) //This returns Mono<Void>
    .map { 
        user.id //Never called!!!
    }
    .map { userId -> 
        //other calls using the propagated userId
    }
}
Run Code Online (Sandbox Code Playgroud)

问题是因为 delete 返回 a Mono<Void>,所以.map { user.id }永远不会调用以下内容。那么如何将 theMono<Void>转换为 aMono<String>来传播 userId?

reactive-programming project-reactor spring-webflux

2
推荐指数
1
解决办法
1999
查看次数

如何对无限通量进行排序?

我有一个Flux由专用Processor含义生成的无限实例,每个元素都是通过发出的sink.nextReceiver如果重要的话,元素来自反应式 Kafka )。问题是,每次我尝试做一些有用的事情时sortFlux它只会给出一个空的结果。这也适用于reduce

难道我做错了什么?

编辑

这是一个更具体的例子,它给出了一个空的Flux

Flux.<Integer>create(sink -> {
  sink.next(1);
  sink.next(2);
  sink.next(3);
  sink.next(4);
})
    .sort() // If I remove this everthing works as expected
    .log()
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

谈到我的具体情况,这是我所拥有的简化版本:

    FluxProcessor<Message, Message> processor = ReplayProcessor.<Message>createTimeout(Duration.ofDays(1)).serialize();
    FluxSink<Message> sink = processor.sink();
    Flux<Message> pipeline = processor;

    kafka.receive()
        .log()
        .map(ReceiverRecord::value)
        .subscribe(sink::next);

    return pipeline; // Work with the pipeline later on
Run Code Online (Sandbox Code Playgroud)

然后,如果我尝试无论是.sort.reducepipeline它总是导致空Flux

java reactive-programming project-reactor spring-webflux

2
推荐指数
1
解决办法
2032
查看次数

Spring WebFlux - 如何从数据库获取数据以供下一步使用

我使用 Spring WebFlux(Project Reactor),但面临以下问题:我必须从 db 获取一些数据才能使用它们来调用另一项服务 - 一个流中的所有内容。怎么做?

public Mono<MyObj> saveObj(Mono<MyObj> obj) {
    return obj
        .flatMap(
            ob->
                Mono.zip(
                        repo1.save(
                          ...),
                        repo2
                            .saveAll(...)
                            .collectList(),
                        repo3
                            .saveAll(...)
                            .collectList())
                    .map(this::createSpecificObject))
        .doOnNext(item-> createObjAndCallAnotherService(item));
  }



private void createObjAndCallAnotherService(Prot prot){
myRepository
        .findById(
            prot.getDomCred().stream()
                .filter(Objects::nonNull)
                .findFirst()
                .map(ConfDomCred::getCredId)
                .orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")))
        .doOnNext( //one value is returned from myRepository -> Flux<MyObjectWithNeededData>
            confCred-> {//from this point the code is unreachable!!! - why????
              Optional<ConfDomCred> confDomCred=
                  prot.getDomCreds().stream().filter(Objects::nonNull).findFirst();

              confDomCred.ifPresent(
                  domCred -> {
                    ProtComDto com=
                        ProtComDto.builder()
                            .userName(confCred.getUsername())
                            .password(confCred.getPassword())                          
                            .build();
                    clientApiToAnotherService.callEndpintInAnotherService(com); //this is a client like Feign …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor spring-webflux

2
推荐指数
1
解决办法
1529
查看次数

Angular 模板中可观察对象上的 ObjectUnsubscribedErrorImpl

我正在使用 Angular 11,并且我正在使用异步管道访问组件模板中的一个 observable 。

路线的第一次加载,一切正常。没有错误。当我离开页面并返回时,出现以下错误:

组件模板:<RM-map *ngIf="(layers$ | async) as layer" [layers]="layers.layerConfig" showLayersPanel="true" id="RIS-map">

成分

import { Component, OnDestroy, OnInit } from '@angular/core';
import { Observable, Subject } from 'rxjs';

import { FullMapViewService } from '../services/full-map-view.service';
import { RISLayerConfigResponse } from '@RM/interfaces';
import { takeUntil } from 'rxjs/operators';

@Component({
  selector: 'RM-full-map-view',
  templateUrl: './full-map-view.component.html',
  styleUrls: ['./full-map-view.component.scss']
})
export class FullMapViewComponent implements OnInit, OnDestroy {
  layers$: Observable<RISLayerConfigResponse>;
  destroyed$: Subject<boolean> = new Subject();
  constructor(private fullMapViewService: FullMapViewService) {}

  ngOnInit(): void { …
Run Code Online (Sandbox Code Playgroud)

javascript reactive-programming rxjs angular rxjs-observables

2
推荐指数
2
解决办法
654
查看次数