eth*_*nny 8 java apache-kafka apache-kafka-streams
我试图<KStream>.process()
用a TimeWindows.of("name", 30000)
来批量处理一些KTable值并发送它们.似乎30秒超过了消费者超时间隔,之后Kafka认为该消费者已经解散并释放分区.
我已经尝试提高轮询频率和提交间隔以避免这种情况:
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
Run Code Online (Sandbox Code Playgroud)
不幸的是,这些错误仍在发生:
(很多这些)
ERROR o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
Run Code Online (Sandbox Code Playgroud)
其次是:
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
Run Code Online (Sandbox Code Playgroud)
显然,我需要更频繁地将心跳发送回服务器.怎么样?
我的拓扑结构是:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String> kt = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate2", 30000));
DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();
kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
Run Code Online (Sandbox Code Playgroud)
该KTable是关键,每30秒分组值.在Processor.init()中我打电话context.schedule(30000)
.
DBProcessorSupplier提供的实例DBProcessor.这是AbstractProcessor的一个实现,其中提供了所有覆盖.他们只做LOG,所以我知道每个人都被击中.
这是一个非常简单的拓扑结构,但很明显我在某个地方错过了一个步骤.
我知道我可以在服务器端进行调整,但我希望有一个客户端解决方案.我喜欢在客户端退出/死亡时很快就可以使用分区的概念.
为了简化问题,我从图中删除了聚合步骤.它现在只是消费者 - >处理器().(如果我直接将消费者发送给.print()
它,请快速工作,所以我知道没关系).(同样如果我通过.print()
它输出聚合(KTable)似乎也可以).
我发现.process()
- 应该.punctuate()
每隔30秒调用一次实际上是阻塞可变长度的时间并且随机输出(如果有的话).
我将调试级别设置为'debug'并重新启动.我看到很多消息:
DEBUG o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>
Run Code Online (Sandbox Code Playgroud)
但是.punctuate()
函数中的断点没有被击中.所以它做了很多工作,但没有让我有机会使用它.
Mat*_*Sax 10
一些澄清:
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG
是提交间隔的下限,即在提交之后,下一次提交不会在此时间之前发生.基本上,Kafka Stream试图在这段时间过后尽快提交,但无法保证下次提交实际需要多长时间.StreamsConfig.POLL_MS_CONFIG
用于内部KafkaConsumer#poll()
呼叫,以指定呼叫的最大阻塞时间poll()
.因此,这两个值对心跳更有帮助.
Kafka Streams在处理记录时遵循"深度优先"策略.这意味着,在poll()
为每个记录执行之后,将执行拓扑的所有运算符.假设你有三个连续的地图,那么在下一个/第二个记录被处理之前,将为第一个记录调用所有三个地图.
因此,在poll()
完成第一次poll()
完全处理的所有记录之后,将进行下一次调用.如果您想更频繁地进行心跳,则需要确保单个poll()
调用获取的记录较少,这样处理所有记录所需的时间较少,而下一个记录poll()
将提前触发.
您可以使用配置参数KafkaConsumer
,你可以通过指定StreamsConfig
完成这个工作(见https://kafka.apache.org/documentation.html#consumerconfigs):
streamConfig.put(ConsumerConfig.XXX,VALUE);
max.poll.records
:如果减小此值,将轮询较少的记录session.timeout.ms
:如果增加此值,则有更多时间处理数据(为了完整性而添加此项,因为它实际上是客户端设置而不是服务器/代理端配置 - 即使您知道此解决方案并且不喜欢它: ))编辑
从Kafka开始
0.10.1
,可以(并推荐)在流配置中为消费者和procuder配置添加前缀.这避免了参数冲突,因为一些参数名称用于消费者和生产者,否则无法区分(并且将同时应用于消费者和生产者).要为参数添加前缀,您可以分别使用StreamsConfig#consumerPrefix()
或StreamsConfig#producerPrefix()
.例如:streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);
还有一件事要补充:此问题中描述的场景是一个已知问题,并且已经有KIP-62为KafkaConsumer
发送心跳引入了后台线程,从而将心跳与poll()
呼叫分离.Kafka Streams将在即将发布的版本中利用这一新功能.
归档时间: |
|
查看次数: |
6490 次 |
最近记录: |