小编Ole*_*uka的帖子

Project Reactor 3 中的 publishOn 与 subscribeOn

我在相同的通量上使用 publishOn 和 subscribeOn,如下所示:

    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");
Run Code Online (Sandbox Code Playgroud)

虽然,当我同时使用两者时,日志中没有打印任何内容。但是当我只使用 publishOn 时,我得到了以下信息日志:

*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Run Code Online (Sandbox Code Playgroud)

是不是publishOn 比subscribeOn 更值得推荐?或者它比 subscribeOn 有更多的偏好?两者有什么区别以及何时使用哪个?

publisher publish-subscribe reactive-programming project-reactor reactive-streams

13
推荐指数
2
解决办法
1万
查看次数

方法命名中的Spring'NOT IN'不能像预期的那样工作

它是错误,还是我的失败?你能解释一下我有什么问题吗?

我创建了简单的JPARepository

@Repository
interface UserRepository extends JpaRepository<User, Long> {
    User findByName(String name);


    Collection<User> findByIdNotIn(Collection<Long> users);
}
Run Code Online (Sandbox Code Playgroud)

看起来很正确.如果users不是空的话,它的工作正确.但否则它工作不正确:

result = userRepository.findByIdNotIn([]);
Run Code Online (Sandbox Code Playgroud)

它返回空结果,但它应该等于findAll方法调用的结果.

userRepository.findByIdNotIn([]).equals(userRepository.findAll());
Run Code Online (Sandbox Code Playgroud)

为了检查结果,我@Query在方法中添加了注释

@Repository
interface UserRepository extends JpaRepository<User, Long> {
    User findByName(String name);

    @Query('SELECT u FROM User u WHERE u.id NOT IN ?1')
    Collection<User> findByIdNotIn(Collection<Long> users);
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,预期结果是正确的.我也试过使用原生Hibernate编写查询CriteriaBuilder

CriteriaBuilder builder = entityManager.getCriteriaBuilder();
CriteriaQuery<User> query = builder.createQuery(User.class);
Root<User> root = query.from(User.class);

query.where(builder.not(root.get("id").in([])));
result = entityManager.createQuery(query.select(root)).getResultList();
Run Code Online (Sandbox Code Playgroud)

在这种情况下,预期结果也是正确的.

附加信息

结果Hibernate查询:更正结果(使用@Query注释): …

java spring hibernate spring-data-jpa spring-boot

7
推荐指数
1
解决办法
1万
查看次数

Reactor Mono vs CompletableFuture

刚刚开始探索反应堆项目及其抽象Mono和Flux,并希望了解与java 8准系统CompletableFuture的基本差异.

这是一个简单的代码:

public static void main(String[] args) throws Exception {

    Mono.fromCallable(() -> getData())
            .map(s -> s + " World ")
            .subscribe(s -> System.out.println(s));

    CompletableFuture.supplyAsync(() -> getData())
            .thenAccept(System.out::println);

    System.out.println(Thread.currentThread()+" End ");
}

private static String getData() {

    int j=0;

    for(int i=0; i<Integer.MAX_VALUE; i++){
        j = j - i%2;
    }

    System.out.println(Thread.currentThread()+" - "+j);
    return " Hello ";
}
Run Code Online (Sandbox Code Playgroud)

首先,没有惊喜CompletableFuture.supplyAsync通过ForkJoinPool调度执行函数,并立即打印"End"行,程序终止,因为主线程在这里真的很短暂 - 正如预期的那样.

Mono.fromCallable(...)阻塞主线程就在那里.此外,在getData()函数中打印的线程名称是主线程.所以我看到顺序/阻塞行为而不是顺序/非阻塞(异步)行为.是因为我在同一个线程上应用了一个订阅函数,它是阻塞的吗?有人可以解释一下吗?

spring java-8 project-reactor spring-webflux

6
推荐指数
1
解决办法
3639
查看次数

Java reactor - 将 Mono&lt;Void&gt; 与另一个产生 Mono&lt;Object&gt; 的异步任务链接起来

我有以下异步任务:

public class AsyncValidationTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
    public Mono<Void> execute(Object o);
}
Run Code Online (Sandbox Code Playgroud)
public class AsyncSaveTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono of Object
    public Mono<Object> execute(Object o);
}
Run Code Online (Sandbox Code Playgroud)

和下面的服务类:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   // Right now, the problem is that when validation completes successfully, it 
                   // emits Mono.empty hence the flatMap chained below will not be …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor reactive-streams

6
推荐指数
1
解决办法
4174
查看次数

Project Reactor 和 Java 内存模型

我试图了解 Project reactor 为应用程序代码提供的关于数据可见性的保证。例如,我希望下面的代码会失败,但经过一百万次迭代后不会失败。我正在更改线程 A 上典型 POJO 的状态并从线程 B 读取它。 Reactor 是否保证 POJO 更改在线程间可见?

public class Main {
    public static void main(String[] args) {
        Integer result = Flux.range(1, 1_000_000)
                .map(i -> {
                    Data data = new Data();
                    data.setValue(i);
                    data.setValueThreeTimes(i);
                    data.setValueObj(i + i);
                    return data;
                })
                .parallel(250)
                .runOn(Schedulers.newParallel("par", 500))
                .map(d -> {
                    d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
                    return d;
                })
                .sequential()
                .parallel(250)
                .runOn(Schedulers.newParallel("par", 500))
                .map(d -> {
                    d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
                    return d;
                })
                //                .sequential()
                .map(d -> {
                    if (d.getValue() * 3 != …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor

5
推荐指数
1
解决办法
604
查看次数

如何在 Project Reactor 3 中将冷流转换为热流?

根据 Mono 和 Flux 的定义,它们都代表一个异步数据序列,在订阅之前什么也不会发生。

并且有两大类出版商:热出版商和冷出版商。Mono 和 Flux 为每个订阅重新生成数据。如果未创建订阅,则永远不会生成数据。

另一方面,热门发布者不依赖于任何数量的订阅者。

这是我的冷流代码:

        System.out.println("*********Calling coldStream************");
        Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
                .doOnNext(System.out::println)
                .filter(s -> s.startsWith("l"))
                .map(String::toUpperCase);

        source.subscribe(d -> System.out.println("Subscriber 1: "+d));
        source.subscribe(d -> System.out.println("Subscriber 2: "+d));
        System.out.println("-------------------------------------");
Run Code Online (Sandbox Code Playgroud)

这是输出:

*********Calling composeStream************
ram
sam
dam
lam
Subscriber 1: LAM
ram
sam
dam
lam
Subscriber 2: LAM
-------------------------------------
Run Code Online (Sandbox Code Playgroud)

如何将上述冷流转换为热流?

publish-subscribe reactive-programming project-reactor reactive-streams

4
推荐指数
2
解决办法
4206
查看次数

Springboot Webflux反​​应器并发模型

我想了解更多有关springboot webflux的基础并发模型的信息?

对于CPU密集型Web服务,传统的阻塞多线程模型是否更合适?还是根据本文https://people.eecs.berkeley.edu/~brewer/papers/threads-hotos-2003.pdf,在一般传统线程池模型中更合适?

如果我在反应堆链中有阻塞步骤,则可以使用publishOn将其调度到其他线程池。这样是否可以释放原始线程并使整个链仍然畅通无阻?

spring-boot project-reactor reactor-netty

4
推荐指数
1
解决办法
950
查看次数

如何通过将两个 Flux 中的值配对到一个元组中来组合发布者?

假设我有两个 Flux,如下所示:

    Flux<Integer> f1 = Flux.just(10,20,30,40);
    Flux<Integer> f2 = Flux.just(100,200,300,400);
Run Code Online (Sandbox Code Playgroud)

现在我想要的是将这些通量组合成单个通量或两个通量的元组,这将在单个通量中具有两个通量的元素。

我使用 zipwith 方法尝试了以下操作:

  Flux<Integer, Integer> zipped = f1.zipWith(f2,
                                            (one, two) ->  one + "," +two)
                                    .subscribe();
Run Code Online (Sandbox Code Playgroud)

但这会产生编译时错误:

 Incorrect number of arguments for type Flux<T>; it cannot be parameterized with arguments <Integer, Integer>
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?请建议。

java publisher publish-subscribe project-reactor reactive-streams

2
推荐指数
1
解决办法
2965
查看次数