说,我想检查Kafka中特定分区的第一条消息和最后一条消息的偏移量。我的想法是将assign(…)方法与seekToBeginning(…)和一起使用seekToEnd(…)。不幸的是,这不起作用。
如果设置AUTO_OFFSET_RESET_CONFIG为"latest",则seekToBeginning(…)无效;如果设置为,则无效。如果我将其设置为"earliest",seekToEnd(…)则不起作用。看来,对我的消费者来说唯一重要的是AUTO_OFFSET_RESET_CONFIG。
我看过类似的主题,但是问题是针对的subscribe(),而不是assign()方法。提出的解决方案是实施ConsumerRebalanceListner并将其作为参数传递给该subscribe()方法。不幸的是,该assign()方法只有一个签名,并且只能获取主题分区的列表。
现在的问题是:是否有可能使用seekToBeginning()或seekToEnd()与assign()方法。如果是,怎么办?如果没有,为什么?
我的代码的相关片段:
KafkaConsumer<String, ProtoMeasurement> consumer = createConsumer();
TopicPartition zeroP = new TopicPartition(TOPIC, 1);
List<TopicPartition> partitions = Collections.singletonList(zeroP);
consumer.assign(partitions);
consumer.poll(Duration.ofSeconds(1));
consumer.seekToBeginning(partitions);
long currOffsetPos = consumer.position(zeroP);
LOGGER.info("Current offset {}.", currOffsetPos);
ConsumerRecords<String, ProtoMeasurement> records = consumer.poll(Duration.ofMillis(100));
// ...
Run Code Online (Sandbox Code Playgroud)
记录器打印偏移量n,它是所考虑主题的最大(最新)偏移量。
我正在准备一个 stackblitz 来解决我在子父母沟通方面遇到的问题,但在此过程中,我遇到了一个不同的问题,即我收到以下错误:No value accessor for form control with name: 'endDateFC'。对于 ,也会发生这种情况startDateFC。
我从 SO 尝试了以下操作:
ReactiveFormsModule和FormsModuleFormsModule。我只在以下位置导入app.modules.ts根据:表单控件没有值访问器
formControlName位于值访问器元素上我的代码有什么问题吗?这是 stackblitz:https://stackblitz.com/edit/primeng-calendar-v-zdd3of ?file=src/app/postpone-dialog/postpone-dialog.component.ts