BdE*_*eer 7 apache-kafka kafka-producer-api spring-kafka
我使用 kafka_2.11-2.1.1 和 Producer 使用 spring 2.1.0.RELEASE。
我在向 Kafka 主题发送消息时正在使用 spring,我的生产者生成了很多 TimeoutExceptions
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time
Run Code Online (Sandbox Code Playgroud)
我正在使用以下 kafka 生产者设置
acks: 1
retries: 1
batchSize: 100
lingerMs: 5
bufferMemory: 33554432
requestTimeoutMs: 60
Run Code Online (Sandbox Code Playgroud)
我尝试了很多组合(特别是batchSize& lingerMs)但没有任何效果。任何帮助请问上述场景的设置应该是什么。
使用以下配置再次尝试......但没有运气同样的错误
acks = 1
batch.size = 15
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class com.spgmi.ca.prescore.partition.CompanyInfoPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120
retries = 1
Run Code Online (Sandbox Code Playgroud)
第二次运行:
我尝试了不同的组合,但没有任何效果。因此我认为这会是网络、SSL 等的问题。所以我在生产者运行的同一台机器上安装并运行了 Kafka,即在我的本地计算机上。
我尝试再次运行指向本地 Kafka 主题的生产者。但没有运气同样的问题。
下面是使用的配置参数。
2019-07-02 05:55:36.663 INFO 9224 --- [lt-dispatcher-2] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 0
bootstrap.servers = [localhost:9092]
request.timeout.ms = 60
retries = 1
buffer.memory = 33554432
linger.ms = 0
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
Run Code Online (Sandbox Code Playgroud)
面临同样的错误: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for inbound_topic--1: 69 ms 自批处理创建加上逗留时间过去
还尝试了 batch.size 5 , 10 & 0 linger_ms 0 , 5 , 10 等 request_time_out 0 , 45, 60, 120 , 300 等。
没有任何作用......同样的错误。
我还应该尝试什么,可能的解决方案是什么?
如何避免负键生成
是的,我设置了本地设置并打印带有分区信息的日志,如下所示
2019-07-03 02:48:28.822 INFO 7092 --- [lt-dispatcher-2] cscppCompanyInfoPartitioner:主题:inbound_topic Key = 597736248-Entropy Cayman Solar Ltd.-null-null-null Partition = -1072 03 02:48:28.931 错误 7092 --- [广告 | producer-1] osksupport.LoggingProducerListener :发送带有 key='597736248-Entropy Cayman Solar Ltd.-null-null-null' 和 payload='com.spgmi.ca.prescore.model.Company@8b12343' 的消息时抛出异常主题 inbound_topic :
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for inbound_topic --1: 104 ms 自批量创建以来已经过去了加上逗留时间
我的主题 inbound_topic 有两个分区,如下所示 C:\Software\kafka\kafka_2.11-2.1.1\bin\windows>kafka-topics.bat --describe --zookeeper localhost:2181 --topic inbound_topic Topic:inbound_topic PartitionCount:2 ReplicationFactor:1 Configs: Topic: inbound_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: inbound_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
但是我的制作人似乎试图发送到 Partition = -1。
我的分区逻辑如下
int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions;
logger.info("Topic : "+ topic + "\t Key = " + (String)key + " Partition = " + p );
Run Code Online (Sandbox Code Playgroud)
关键是我在做 hashCode()。这里需要纠正什么以避免这个负数分区号?即分区 = -1
我的分区键逻辑应该是什么样的?
任何帮助高度appriciated。
该错误表明某些记录以比从客户端发送的速度更快的速度放入队列。
当您的 Producer 发送消息时,它们会存储在缓冲区中(在将它们发送到目标代理之前),并且将记录分组在一起以增加吞吐量。当一个新记录被添加到批处理中时,它必须在一个可配置的时间窗口内发送,该窗口由request.timeout.ms(默认设置为 30 秒)控制。如果批处理在队列中的时间更长,TimeoutException则会抛出 a 并且批处理记录将从队列中删除并且不会传递给代理。
增加的价值request.timeout.ms应该对你有用。
如果这不起作用,您还可以尝试减少batch.size以更频繁地发送批次(但这次将包含更少的消息)并确保将linger.ms其设置为 0(这是默认值)。
请注意,您需要在更改任何配置参数后重新启动 kafka 代理。
如果您仍然收到错误,我认为您的网络出现问题。您是否启用了 SSL?
| 归档时间: |
|
| 查看次数: |
13586 次 |
| 最近记录: |