小编jam*_*ham的帖子

拆分/重新加入 Flux 的正确方法

以下是将 Flux 拆分为不同的处理路径并将它们连接回来的正确/惯用方法 - 就问题而言,不应丢弃事件,排序不重要,并且内存是无限的。

Flux<Integer> beforeFork = Flux.range(1, 10);

ConnectableFlux<Integer> forkPoint = beforeFork
    .publish()
;

Flux<String> slowPath = forkPoint
        .filter(i -> i % 2 == 0)
        .map(i -> "slow"+"_"+i)
        .delayElements(Duration.ofSeconds(1))
;

Flux<String> fastPath = forkPoint
        .filter(i -> i % 2 != 0)
        .map(i -> "fast"+"_"+i)
;

// merge vs concat since we need to eagerly subscribe to
// the ConnectableFlux before the connect()
Flux.merge(fastPath, slowPath)
    .map(s -> s.toUpperCase()) // pretend this is a more complex sequence
    .subscribe(System.out:println)
;
forkPoint.connect(); …
Run Code Online (Sandbox Code Playgroud)

project-reactor

6
推荐指数
1
解决办法
1032
查看次数

Reactor 3.x - 限制 groupBy Flux 的时间

不管上游的完整性如何,有没有办法强制 groupBy() 生成的 Flux 在一段时间后完成(或类似地,限制“打开”组的最大数量)?我有如下内容:

Flux<Foo> someFastPublisher;

someFastPublisher
  .groupBy(f -> f.getKey())
  .delayElements(Duration.ofSeconds(1)) // rate limit each group
  .flatMap(g -> g) // unwind the group
  .subscribe()
;
Run Code Online (Sandbox Code Playgroud)

并且遇到了 Flux 挂起的情况,假设是因为组数大于flatMap并发性。我可以增加flatMap并发性,但没有简单的方法来判断最大可能的大小是多少。相反,我知道Foo被分组的'sFoo.key将在时间/发布顺序上彼此接近,并且宁愿在 groupBy Flux 与 flatMap 并发性上使用某种时间窗口(并以两个不同的组 w 结束) / 同样key()没什么大不了的)。

我猜groupByFlux 不会在 onComplete 之前someFastPubisheronComplete - 即 Flux 被移交以flatMap保持“开放”(尽管他们不太可能获得新事件)。

我可以通过Integer.MAX在 groupBy 中预取或Integer.MAX并发来解决这个问题- 但是有没有办法控制组的“生命”?

project-reactor

6
推荐指数
1
解决办法
1667
查看次数

List&lt;String&gt; 的 Spring Data JPA“找不到 PersistentEntity”

很难弄清楚我是遇到错误还是做了一些愚蠢的事情......

Spring Boot v2.0.0.M7、spring-data-jpa、spring-data-rest、MySQL

以下@Query

@Query("select DISTINCT item.statusCode from Item item")
public List<String> lookupStatusCodes();
Run Code Online (Sandbox Code Playgroud)

在 aPagingAndSortingRepository上扔 a

java.lang.IllegalArgumentException: Couldn't find PersistentEntity for type class java.lang.String!
at org.springframework.data.mapping.context.PersistentEntities.lambda$getRequiredPersistentEntity$2(PersistentEntities.java:78) ~[spring-data-commons-2.0.2.RELEASE.jar:2.0.2.RELEASE]
at java.util.Optional.orElseThrow(Unknown Source) ~[na:1.8.0_151]
at org.springframework.data.mapping.context.PersistentEntities.getRequiredPersistentEntity(PersistentEntities.java:77) ~[spring-data-commons-2.0.2.RELEASE.jar:2.0.2.RELEASE]
at org.springframework.data.rest.webmvc.PersistentEntityResourceAssembler.wrap(PersistentEntityResourceAssembler.java:72) ~[spring-data-rest-webmvc-3.0.2.RELEASE.jar:3.0.2.RELEASE]
Run Code Online (Sandbox Code Playgroud)

statusCode 是一个 varchar 并且Item它本身像 @Entity 一样按预期工作,但是尝试投影到一个字符串列表(或List<Object>)会因上述情况而失败。

如果重要的话,我故意不想在Page<String>此处返回 a (由于预期结果集很小,因此不需要分页)。

spring-data-jpa spring-boot

5
推荐指数
1
解决办法
1万
查看次数