标签: reactive-programming

Vuejs:组件之间共享状态

我想知道在 Vuejs 中组件之间实现共享状态的最佳实践。

想象一下情况 A:您有一个显示模式的 Web 应用程序。模态具有布尔状态show。如果单击模式确定按钮,而且单击背景的任何部分,甚至可能在某些服务器上推送状态更改,则该状态应该更改。因此,模式应该能够像父应用程序一样更改状态。

情况 B:您有一个 Web 应用程序,它显示共享公共数据的不同组件内的输入字段value。如果用户value通过一个组件中的字段进行更改,则它也应该在另一组件中进行更新。同样,两者甚至都应该在服务器推送事件上更新。

问题:

  • 我是对的,解决这个问题的正确方法是使用vuex并使共享状态成为一个存储字段,所有需要修改该值的组件/父级都可以观察并通过发出的操作进行更改?

  • 这是否不会引入我们从Meteor中了解到的这种危险的(因为难以处理)魔法反应?

  • 如何最好地记录流程,取决于什么?

reactive-programming vue.js vuex vuejs2

1
推荐指数
1
解决办法
2186
查看次数

如何对 ActorPublisher 进行反压

我正在编写一些示例来理解 akka 流和背压。我正在尝试了解缓慢的消费者背压对 AkkaPublisher 有何影响

我的代码如下。

class DataPublisher extends ActorPublisher[Int] {

  import akka.stream.actor.ActorPublisherMessage._

  var items: List[Int] = List.empty

  def receive = {
    case s: String =>
      println(s"Producer buffer size ${items.size}")
      if (totalDemand == 0)
        items = items :+ s.toInt
      else
        onNext(s.toInt)

    case Request(demand) =>
      if (demand > items.size) {
        items foreach (onNext)
        items = List.empty
      }
      else {
        val (send, keep) = items.splitAt(demand.toInt)
        items = keep
        send foreach (onNext)
      }


    case other =>
      println(s"got other $other")
  }
}
Run Code Online (Sandbox Code Playgroud)

Source.fromPublisher(ActorPublisher[Int](dataPublisherRef)).runWith(sink) …
Run Code Online (Sandbox Code Playgroud)

scala reactive-programming akka reactive-streams akka-stream

1
推荐指数
1
解决办法
758
查看次数

触发CombineLatest在Combine中传播初始值

我有两个字符串发布者和一个返回 AnyPublisher 的计算属性。逻辑很简单,但我想知道是否有任何方法可以传播初始值。我认为这应该是可能的,因为出版商有初始值。

在 VC 中,我从 ViewModel(从 textField)为 Publishers 分配新值。

firstTextField.addTarget(self, action: #selector(firstTextFieldDidChange(_:)), for: .editingChanged)
secondTextField.addTarget(self, action: #selector(secondTextFieldDidChange(_:)), for: .editingChanged)

@objc private func firstTextFieldDidChange(_ textField: UITextField) {
 viewModel.firstPublisher = textField.text ?? ""
}
@objc private func secondTextFieldDidChange(_ textField: UITextField) {
 viewModel.secondPublisher = textField.text ?? ""
}
Run Code Online (Sandbox Code Playgroud)

然后我将 Publisher (combineLatest) 分配给我的按钮:

_ = viewModel.validatedText
   .receive(on: RunLoop.main)
   .assign(to: \.isEnabled, on: button)
Run Code Online (Sandbox Code Playgroud)

在 VM 中我有两个发布者:

@Published var firstPublisher: String = ""
@Published var secondPublisher: String = ""
Run Code Online (Sandbox Code Playgroud)

并结合最新:

var validatedText: AnyPublisher<Bool, Never> { …
Run Code Online (Sandbox Code Playgroud)

reactive-programming swift combinelatest combine

1
推荐指数
1
解决办法
3241
查看次数

Spring WebClient 下载图像

我一直在研究响应式编程,最近尝试使用 Spring WebFlux 构建 POC。我想从简单开始,只使用 WebClient 下载图像;特别是https://greatatmosphere.files.wordpress.com/2013/02/great-atmosphere-149-tenaya-lake-yosemite-national-park-2.jpg

我尝试过以下代码

    String block = WebClient.create("https://greatatmosphere.files.wordpress.com/2013/02/great-atmosphere-149-tenaya-lake-yosemite-national-park-2.jpg")
            .get()
            .accept(MediaType.IMAGE_JPEG)
            .retrieve()
            .bodyToMono(String.class)
            .doOnError(WebClientResponseException.class,
                    ex -> System.out.println(ex.getStatusCode() + ": " + ex.getResponseBodyAsString()))
            .log()
            .block();
    System.out.println("output:" + block);
Run Code Online (Sandbox Code Playgroud)

但它没有按预期工作。看起来数据在不断地流式传输并且获取请求不会终止。

我确信我错过了一些简单的东西,但我似乎无法弄清楚。我尝试了多种参数,但结果都是一样的。

如何使用WebClient下载图像然后终止?

java reactive-programming spring-webflux

1
推荐指数
1
解决办法
4989
查看次数

使用 R2BC Kotlin 协程进行数据库事务

我在 Spring Webflux 项目中使用 R2bc 和 Kotlin。它运行良好。但我有这个方法

@Component
class UserRepository(private val client: DatabaseClient, private val operator: TransactionalOperator) {

    suspend fun updateUser(user: User, value: String): Int {

        client.execute("INSERT INTO log(user_id, activity) VALUES (:user_id, :activity)")
              .bind("activity", user.activity)
              .bind("user_id", user.id)
              .fetch()
              .awaitRowsUpdated()

        return client.execute("UPDATE users SET value = :value WHERE id = :id")
                     .bind("value", value)
                     .bind("id", user.id)
                     .fetch()
                     .awaitRowsUpdated()
}
Run Code Online (Sandbox Code Playgroud)

此方法有效,但我想使用数据库事务。Kotlin 是否支持。

reactive-programming kotlin reactive spring-webflux kotlin-coroutines

1
推荐指数
1
解决办法
1998
查看次数

用于切换服务器的反应式 Webflux - 有好处吗?

我们需要实现一个简单的切换服务器(其余应用程序),该服务器将获取切换名称并在启用或禁用时返回。我们预计每天会有数十万个请求的负载。

Spring(反应式)webflux 在这里有意义吗?

我的理解是,如果 http 线程有任何空闲时间的可能性,反应式休息 api 将很有用 - 这意味着线程等待完成某些工作,并且在收到来自数据库读取或休息调用的响应之前无法继续进行其他服务。

我们的用例只是返回正在查询的切换值(可能来自某些缓存)。反应式休息服务对我们的情况有用吗?与简单的 Spring Boot 应用程序相比,它有什么优势吗?

java spring reactive-programming spring-boot spring-webflux

1
推荐指数
1
解决办法
442
查看次数

项目反应堆:在订阅和发布的情况下线程将如何创建,流程如何?,堆栈跟踪?

简单的例子来理解线程流程。

  1. [ gshp subscribedOn-1 ] INFOreactor.Flux.FlatMap.1 -onSubscribe ( FluxFlatMap.FlatMapMain )

  2. [ gshppublishOn-7 ] INFOreactor.Flux.FlatMap.1 - onNext(6)

这里reactor.Flux.FlatMap.1对于gshp subscribedOn-1gshppublishOn-7是通用的

当我们运行java时,它从主线程开始,之后会发生什么,它会创建gshp subscribedOn-1还是reactor.Flux.FlatMap.1

  @Test
  public void setUpTestTest() {
      Scheduler scheduler1 = Schedulers.newParallel("gshp subscribedOn", 3);
      Scheduler scheduler2 = Schedulers.newParallel("gshp publishOn", 6);
      Flux<String> flux = Flux.range(1, 200)
                              .flatMap(s-> Flux.just(""+s)
                                               .publishOn(scheduler2)
                                               .concatMap(d->processMessagefluxpause(d, "test")))
                                               .log()
                              .subscribeOn(scheduler1);

    StepVerifier.create(flux).expectNextCount(20).verifyComplete();
}
Run Code Online (Sandbox Code Playgroud)

这意味着什么,流程如何? 在此输入图像描述

java reactive-programming project-reactor spring-webflux

1
推荐指数
1
解决办法
1228
查看次数

Reactor:将 zipWhen 与另一个元组一起使用后如何获得扁平元组?

当我链接多个zipWhen调用时,结果将是 aTuble2<Tuple2<Foo, Bar>, Bam>而不是 a Tuple3<Foo, Bar, Bam>。随后的每一次这种情况都会变得更糟zipWhen

例子:

val getFoo()
  .zipWhen { foo ->
    getBar(foo)
  }
  .zipWhen { fooBar -> 
    getBam(fooBar.t1, fooBar.t2)
  }
  .doOnNext { fooBarBam ->
    log.debug { "foo: ${fooBarBam.t1.t1}" }
    log.debug { "bar: ${fooBarBam.t1.t2}" }
    log.debug { "bam: ${fooBarBam.t2}" }
  }
Run Code Online (Sandbox Code Playgroud)

获取 doOnNext 的最优雅且可重用的方法是什么Tubple3

reactive-programming kotlin project-reactor

1
推荐指数
1
解决办法
2394
查看次数

你有测试来显示反应器 map() 和 flatMap() 之间的差异吗?

我仍在尝试了解 reactor map() 和 flatMap() 方法之间的区别。首先我查看了 API,但它并没有真正的帮助,它让我更加困惑。然后我用谷歌搜索了很多,但似乎没有人有一个例子来使差异变得可以理解,如果有任何差异的话。

因此,我尝试编写两个测试来查看每种方法的不同行为。但不幸的是它并没有像我希望的那样工作......

第一个测试方法是测试反应式 flatMap() 方法:

@Test
void fluxFlatMapTest() {
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .window(2)
            .flatMap(fluxOfInts -> fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel()))
            .doOnNext(System.out::println)
            .subscribe();
}
Run Code Online (Sandbox Code Playgroud)

输出符合预期,可解释,如下所示:

9 - parallel-2
1 - parallel-1
4 - parallel-1
25 - parallel-3
36 - parallel-3
49 - parallel-4
64 - parallel-4
81 - parallel-5
100 - parallel-5
16 - parallel-2
Run Code Online (Sandbox Code Playgroud)

第二种方法应该测试 map() 方法的输出,以与 flatMap() 方法的上述结果进行比较。

@Test
void fluxMapTest() {
    final int start = 1;
    final int stop …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming spring-boot spring-webflux

1
推荐指数
1
解决办法
399
查看次数

如何在空 Mono 之后链接反应操作而不阻塞?

基本上我想要实现的是调用第二个存储库(a ReactiveCrudRepository)或抛出异常,具体取决于调用第一个存储库的结果。

我最初的想法是这样的:

/** Reactive with blocking code */
public Flux<SecondThing> getThings(String firstThingName) {
    FirstThing firstThing = firstRepo
        .findByName(firstThingName)
        // Warning: "Inappropriate blocking method call"
        .blockOptional()  // this fails in test-context
        .orElseThrow(() -> new FirstThingNotFound(firstThingName));

    return secondRepo.findAllByFirstThingId(firstThing.getId());
}
Run Code Online (Sandbox Code Playgroud)

这对应于以下非反应式方法:

/** Non-reactive */
public List<SecondThing> getThings(String firstThingName) {
    FirstThing firstThing = firstRepo
        .findByName(firstThingName)
        .orElseThrow(() -> new FirstThingNotFound(firstThingName));

    return secondRepo.findAllByFirstThingId(firstThing.getId());
}
Run Code Online (Sandbox Code Playgroud)

我还没有找到一种方法以反应式非阻塞方式做到这一点。Mono我所需要的只是在第一次调用中出现空值时抛出错误,如果不为空则继续管道;但我在这里似乎无法正确使用onErrorStopdoOnError正确使用,并且map没有帮助,因为它跳过了空的Mono.

如果我使用id而不是,我有一个解决方法name,但我对它不太满意,因为它在 是 的实例FirstThing但没有 …

reactive-programming project-reactor spring-webflux

1
推荐指数
1
解决办法
2120
查看次数