我在相同的通量上使用 publishOn 和 subscribeOn,如下所示:
System.out.println("*********Calling Concurrency************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.map(i -> i * 2)
.log()
.publishOn(Schedulers.elastic())
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
System.out.println("-------------------------------------");
Run Code Online (Sandbox Code Playgroud)
虽然,当我同时使用两者时,日志中没有打印任何内容。但是当我只使用 publishOn 时,我得到了以下信息日志:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Run Code Online (Sandbox Code Playgroud)
是不是publishOn 比subscribeOn 更值得推荐?或者它比 subscribeOn 有更多的偏好?两者有什么区别以及何时使用哪个?
publisher publish-subscribe reactive-programming project-reactor reactive-streams
它是错误,还是我的失败?你能解释一下我有什么问题吗?
我创建了简单的JPARepository
@Repository
interface UserRepository extends JpaRepository<User, Long> {
User findByName(String name);
Collection<User> findByIdNotIn(Collection<Long> users);
}
Run Code Online (Sandbox Code Playgroud)
看起来很正确.如果users不是空的话,它的工作正确.但否则它工作不正确:
result = userRepository.findByIdNotIn([]);
Run Code Online (Sandbox Code Playgroud)
它返回空结果,但它应该等于findAll方法调用的结果.
userRepository.findByIdNotIn([]).equals(userRepository.findAll());
Run Code Online (Sandbox Code Playgroud)
为了检查结果,我@Query在方法中添加了注释
@Repository
interface UserRepository extends JpaRepository<User, Long> {
User findByName(String name);
@Query('SELECT u FROM User u WHERE u.id NOT IN ?1')
Collection<User> findByIdNotIn(Collection<Long> users);
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,预期结果是正确的.我也试过使用原生Hibernate编写查询CriteriaBuilder
CriteriaBuilder builder = entityManager.getCriteriaBuilder();
CriteriaQuery<User> query = builder.createQuery(User.class);
Root<User> root = query.from(User.class);
query.where(builder.not(root.get("id").in([])));
result = entityManager.createQuery(query.select(root)).getResultList();
Run Code Online (Sandbox Code Playgroud)
在这种情况下,预期结果也是正确的.
结果Hibernate查询:更正结果(使用@Query注释): …
刚刚开始探索反应堆项目及其抽象Mono和Flux,并希望了解与java 8准系统CompletableFuture的基本差异.
这是一个简单的代码:
public static void main(String[] args) throws Exception {
Mono.fromCallable(() -> getData())
.map(s -> s + " World ")
.subscribe(s -> System.out.println(s));
CompletableFuture.supplyAsync(() -> getData())
.thenAccept(System.out::println);
System.out.println(Thread.currentThread()+" End ");
}
private static String getData() {
int j=0;
for(int i=0; i<Integer.MAX_VALUE; i++){
j = j - i%2;
}
System.out.println(Thread.currentThread()+" - "+j);
return " Hello ";
}
Run Code Online (Sandbox Code Playgroud)
首先,没有惊喜CompletableFuture.supplyAsync通过ForkJoinPool调度执行函数,并立即打印"End"行,程序终止,因为主线程在这里真的很短暂 - 正如预期的那样.
但Mono.fromCallable(...)阻塞主线程就在那里.此外,在getData()函数中打印的线程名称是主线程.所以我看到顺序/阻塞行为而不是顺序/非阻塞(异步)行为.是因为我在同一个线程上应用了一个订阅函数,它是阻塞的吗?有人可以解释一下吗?
我有以下异步任务:
public class AsyncValidationTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
public Mono<Void> execute(Object o);
}
Run Code Online (Sandbox Code Playgroud)
public class AsyncSaveTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono of Object
public Mono<Object> execute(Object o);
}
Run Code Online (Sandbox Code Playgroud)
和下面的服务类:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
// Right now, the problem is that when validation completes successfully, it
// emits Mono.empty hence the flatMap chained below will not be …Run Code Online (Sandbox Code Playgroud) 我试图了解 Project reactor 为应用程序代码提供的关于数据可见性的保证。例如,我希望下面的代码会失败,但经过一百万次迭代后不会失败。我正在更改线程 A 上典型 POJO 的状态并从线程 B 读取它。 Reactor 是否保证 POJO 更改在线程间可见?
public class Main {
public static void main(String[] args) {
Integer result = Flux.range(1, 1_000_000)
.map(i -> {
Data data = new Data();
data.setValue(i);
data.setValueThreeTimes(i);
data.setValueObj(i + i);
return data;
})
.parallel(250)
.runOn(Schedulers.newParallel("par", 500))
.map(d -> {
d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
return d;
})
.sequential()
.parallel(250)
.runOn(Schedulers.newParallel("par", 500))
.map(d -> {
d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
return d;
})
// .sequential()
.map(d -> {
if (d.getValue() * 3 != …Run Code Online (Sandbox Code Playgroud) 根据 Mono 和 Flux 的定义,它们都代表一个异步数据序列,在订阅之前什么也不会发生。
并且有两大类出版商:热出版商和冷出版商。Mono 和 Flux 为每个订阅重新生成数据。如果未创建订阅,则永远不会生成数据。
另一方面,热门发布者不依赖于任何数量的订阅者。
这是我的冷流代码:
System.out.println("*********Calling coldStream************");
Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
.doOnNext(System.out::println)
.filter(s -> s.startsWith("l"))
.map(String::toUpperCase);
source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));
System.out.println("-------------------------------------");
Run Code Online (Sandbox Code Playgroud)
这是输出:
*********Calling composeStream************
ram
sam
dam
lam
Subscriber 1: LAM
ram
sam
dam
lam
Subscriber 2: LAM
-------------------------------------
Run Code Online (Sandbox Code Playgroud)
如何将上述冷流转换为热流?
publish-subscribe reactive-programming project-reactor reactive-streams
我想了解更多有关springboot webflux的基础并发模型的信息?
对于CPU密集型Web服务,传统的阻塞多线程模型是否更合适?还是根据本文https://people.eecs.berkeley.edu/~brewer/papers/threads-hotos-2003.pdf,在一般传统线程池模型中更合适?
如果我在反应堆链中有阻塞步骤,则可以使用publishOn将其调度到其他线程池。这样是否可以释放原始线程并使整个链仍然畅通无阻?
假设我有两个 Flux,如下所示:
Flux<Integer> f1 = Flux.just(10,20,30,40);
Flux<Integer> f2 = Flux.just(100,200,300,400);
Run Code Online (Sandbox Code Playgroud)
现在我想要的是将这些通量组合成单个通量或两个通量的元组,这将在单个通量中具有两个通量的元素。
我使用 zipwith 方法尝试了以下操作:
Flux<Integer, Integer> zipped = f1.zipWith(f2,
(one, two) -> one + "," +two)
.subscribe();
Run Code Online (Sandbox Code Playgroud)
但这会产生编译时错误:
Incorrect number of arguments for type Flux<T>; it cannot be parameterized with arguments <Integer, Integer>
Run Code Online (Sandbox Code Playgroud)
我怎样才能做到这一点?请建议。
java publisher publish-subscribe project-reactor reactive-streams