我正在尝试实现基本场景,从头开始重新阅读主题(至少 1 条消息),并且我面临意外的行为。
假设有 1 个分区主题恰好容纳 100 万条消息,1 个消费者的偏移量已在中间某处提交,没有活动的生产者。
首先我尝试过
consumer.subscribe(Collections.singletonList(topic));
consumer.seekToBeginning(Collections.emptySet());
consumer.poll(Duration.ofMillis(longTimeout)); //no loop to simplify
Run Code Online (Sandbox Code Playgroud)
这不起作用(没有轮询消息)。我读过这seekToBeginning
是懒惰的(而且没关系),但事实证明,seekToBeginning
根本没有影响,因为它需要已经分配的分区,只有第一次轮询才会发生这种情况。它应该在文档中描述吗,还是我错过了?
然后我尝试过
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(Duration.ofMillis(assignTimeout));
consumer.seekToBeginning(Collections.emptySet());
consumer.poll(Duration.ofMillis(longTimeout));//no loop to simplify
Run Code Online (Sandbox Code Playgroud)
事实证明,这取决于assignTimeout
. 这应该足以完成加入过程。该时间可能会有所不同,并且不可能依赖它。
然后我提供ConsumerRebalanceListener
了
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
Run Code Online (Sandbox Code Playgroud)
并单poll
左。它似乎终于起作用了。
所以问题是:
seekToBeginning
不是之后subscribe
就没用了?应该记录下来吗?ConsumerRebalanceListener
可靠吗?它是否保证在应用搜索之前不会轮询来自中间(提交偏移量)的消息?我正在使用带有 curator 2.12.0 的 Zookeeper。
我通过使用观察者调用 getChildren (我事先不知道完整的节点路径)来成功观察新节点,该观察者提交任务以再次使用观察者调用 getChildren 。现在我想观看节点删除并且仅删除。我用观察者调用 checkExists 。但是,如果节点由于某种原因不存在,它实际上将成为节点创建的观察者,这在我的情况下永远不会发生。所以我会留下越来越多的“鞭打”观察者,我想这会浪费一些资源。
如何仅在节点存在时删除观察者或添加观察者?
Flux.just("a", "b")
.flatMap(s -> s.equals("a") ? Mono.error(new RuntimeException() : Flux.just(s + "1", s + "2"))
.onErrorResume(throwable -> Mono.empty())
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
你好!
在这里,我对两个元素进行了处理,然后通过flatMap将第一个暴露给异常,第二个暴露给另一个Flux。
随着onErrorResume
我期望的输出
b1
b2
Run Code Online (Sandbox Code Playgroud)
但一无所获。有人可以解释为什么会发生吗?
谢谢。