我正在使用Kafka,我们有一个用例来构建一个容错系统,甚至连一条消息都不会错过.所以这就是问题所在:如果由于任何原因(ZooKeeper down,Kafka broker等)向Kafka发布失败,我们如何能够有效地处理这些消息并在事情再次恢复后重播它们.正如我所说的那样,即使单个消息失败也无法承受.另一个用例是我们还需要在任何给定的时间点知道有多少消息由于任何原因而无法发布到Kafka,例如计数器功能,现在这些消息需要再次重新发布.
其中一个解决方案是将这些消息推送到某个数据库(如Cassandra,其中写入速度非常快,但我们还需要计数器功能,我猜Cassandra计数器功能并不是那么好,我们不想使用它.)可以处理这种负载也为我们提供了非常准确的计数器设施.
这个问题更多来自架构方面,然后是使用哪种技术来实现这一目标.
PS:我们处理像3000TPS这样的地方.因此,当系统启动失败时,这些失败的消息可以在非常短的时间内快速增长.我们正在使用基于java的框架.
谢谢你的帮助!
我开始使用 Project reactor,其中一个让我苦苦挣扎的地方是如何将 Mono 的东西与 Flux 结合起来。这是我的用例:
public interface GroupRepository {
Mono<GroupModel> getGroup(Long groupId);
}
public interface UserRepository {
Flux<User> getUsers(Set<Long> userIds);
}
Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.
Run Code Online (Sandbox Code Playgroud)
现在我想要实现的是:
如何组合来自 UserFlux 的响应并将这些用户与该组相关联,例如 group.addUsers(userfromFlux)。
有人可以帮助如何组合来自 userFlux 和 groupMono 的结果。我想我使用了 Zip 之类的东西,但它会从源代码进行一对一映射。就我而言,我需要进行 1 到 N 映射。在这里,我有一个组但需要添加到该组的多个用户。返回Mono<List<Users>然后将 zip 运算符与 mono 一起使用并提供此处提到的组合器是个好主意
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
final BiFunction<? …
我正在使用 Spring 5 反应式堆栈编写一个应用程序。目前,我们正在使用 spring-security-kerberos-core 1.1 以及 Spring 4 工件。想知道在 Spring 5 中与反应式堆栈一起编写内容时,是否有可用于 kerberos-core 的反应式库?
我正在使用Project Reactor我正在写的新服务。我使用的是Spring 5带Netty。我正在与许多不同的服务和关系数据库进行交互。所有这些服务都有一个正在阻止并且JDBC也在阻止的客户端。因此,基本上所有这些网络调用都不返回发布者。
我的问题是使用所有阻塞API时的最佳做法是什么。目前,我所做的是将所有阻塞的API代码包装Mono.callable并使用subscribeOn(Scheduler.elastic())。因此,基本上所有阻塞工作都是在弹性线程池上完成的。
问题1.我是否应该创建专用的线程池执行程序,而不是使用Elastic?如果是,那为什么呢?还是应该为每个不同的服务创建一个专用的弹性池?
问题2.在阻塞方法返回结果时,我应该使用publishOn以便主线程再次进行处理吗?如果是,如何获取主线程(netty事件循环线程?)引用?
问题3.如果我要在多个不同的运算符中调用多个阻塞调用(某些在链接时,有些在使用zip()调用时),那么使用subscribe publishand再次subscribe publish不会带来很多上下文切换?
这些问题大多数与最佳实践有关。
我想知道每个Cassandra节点中的数据是否均匀分布.是否有nodetool命令可以让我概述在哪个节点中存在多少数据.我只想确保数据均匀分布在所有这些Cassandra节点中,并且没有热点.
我已经开始玩项目反应器,并希望将我们的一个 API 移动到反应式的做事方式。我想知道什么是立即处理像 ListenableFuture 这样的事情。
就我而言,我使用的是 Cassandra,当我调用session.executeAsync()它时,它会返回一个扩展了 ListenableFuture 的 ResultSetFuture。下面是我现在编写的代码示例,我似乎对向客户公开 ListenableFuture 不满意。
public Mono<ListenableFuture<Void>> save(Publisher<AccountDTO> accountPublisher) {
return Mono.just(accountPublisher)
.map(accountDTO -> {
Account accountEntity = modelMapper.map(accountDTO, Account.class);
return mappingManager.mapper(Account.class).saveAsync(accountEntity);
})
.retry(1)
.doOnError(throwable -> log.error("Unable to create account "))
.mapError(throwable -> new MyCustomException(""));
}
Run Code Online (Sandbox Code Playgroud)
我的问题是:
公开 ListenableFuture 是否是一个好习惯,我个人不想将这样的任何东西回馈给他们可以阻止的客户端。在项目反应器中是否有更好的方法来处理这个问题,我可以只返回 Mono?
我需要将日期字符串转换为另一种特定格式。
例如:我有一个日期可以作为YYYY-MM-DDorYYYY-MM-DDThh:mm:ss±0000并且我需要将其转换为 something "yyyy-mm-ddTHH:MM:SSZ"。
我使用 Java 8 API 尝试了一些东西,但找不到出路。
我开始使用项目反应器。有谁知道如何将线程局部变量从一个线程传递到另一个线程?我看到了一些方法,Hooks.java但无法弄清楚推荐的方法是什么。有人可以指点我一些文档或关于如何做的代码片段。谢谢。