小编G.D*_*rov的帖子

Apache Kafka:查找和分配。从头开始可靠阅读

我正在尝试实现基本场景,从头开始重新阅读主题(至少 1 条消息),并且我面临意外的行为。

假设有 1 个分区主题恰好容纳 100 万条消息,1 个消费者的偏移量已在中间某处提交,没有活动的生产者。

首先我尝试过

  consumer.subscribe(Collections.singletonList(topic));
  consumer.seekToBeginning(Collections.emptySet());
  consumer.poll(Duration.ofMillis(longTimeout)); //no loop to simplify
Run Code Online (Sandbox Code Playgroud)

这不起作用(没有轮询消息)。我读过这seekToBeginning是懒惰的(而且没关系),但事实证明,seekToBeginning根本没有影响,因为它需要已经分配的分区,只有第一次轮询才会发生这种情况。它应该在文档中描述吗,还是我错过了?

然后我尝试过

  consumer.subscribe(Collections.singletonList(topic));
  consumer.poll(Duration.ofMillis(assignTimeout));
  consumer.seekToBeginning(Collections.emptySet());
  consumer.poll(Duration.ofMillis(longTimeout));//no loop to simplify
Run Code Online (Sandbox Code Playgroud)

事实证明,这取决于assignTimeout. 这应该足以完成加入过程。该时间可能会有所不同,并且不可能依赖它。

然后我提供ConsumerRebalanceListener

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
      consumer.seekToBeginning(partitions);
    }
Run Code Online (Sandbox Code Playgroud)

并单poll左。它似乎终于起作用了。

所以问题是:

  1. seekToBeginning不是之后subscribe就没用了?应该记录下来吗?
  2. 解决方案ConsumerRebalanceListener可靠吗?它是否保证在应用搜索之前不会轮询来自中间(提交偏移量)的消息?

java apache-kafka kafka-consumer-api

3
推荐指数
1
解决办法
3173
查看次数

Zookeper + Curator:监视节点删除/删除监视

我正在使用带有 curator 2.12.0 的 Zookeeper。

我通过使用观察者调用 getChildren (我事先不知道完整的节点路径)来成功观察新节点,该观察者提交任务以再次使用观察者调用 getChildren 。现在我想观看节点删除并且仅删除。我用观察者调用 checkExists 。但是,如果节点由于某种原因不存在,它实际上将成为节点创建的观察者,这在我的情况下永远不会发生。所以我会留下越来越多的“鞭打”观察者,我想这会浪费一些资源。

如何仅在节点存在时删除观察者或添加观察者?

java apache-zookeeper apache-curator

2
推荐指数
1
解决办法
1650
查看次数

项目反应堆:flatMap之后的onErrorResume

Flux.just("a", "b")
        .flatMap(s -> s.equals("a") ? Mono.error(new RuntimeException() : Flux.just(s + "1", s + "2"))
        .onErrorResume(throwable -> Mono.empty())
        .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

你好!

在这里,我对两个元素进行了处理,然后通过flatMap将第一个暴露给异常,第二个暴露给另一个Flux。

随着onErrorResume我期望的输出

b1
b2
Run Code Online (Sandbox Code Playgroud)

但一无所获。有人可以解释为什么会发生吗?

谢谢。

java reactor project-reactor reactive-streams

0
推荐指数
2
解决办法
1913
查看次数