Son*_*123 95 java apache-kafka
我使用Java Producer API将String-messages发送到Kafka V. 0.8.如果邮件大小约为15 MB,我会得到一个MessageSizeTooLargeException.我试图设置message.max.bytes为40 MB,但我仍然得到例外.小消息没有问题.
(例外情况出现在制作人中,我在此应用程序中没有使用者.)
我该怎么做才能摆脱这种异常?
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Run Code Online (Sandbox Code Playgroud)
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Run Code Online (Sandbox Code Playgroud)
lau*_*man 158
您需要调整三(或四)个属性:
fetch.message.max.bytes- 这将确定消费者可以获取的消息的最大大小.replica.fetch.max.bytes- 这将允许代理中的副本在群集内发送消息,并确保正确复制消息.如果这个太小,则永远不会复制该消息,因此,消费者永远不会看到该消息,因为该消息永远不会被提交(完全复制).message.max.bytes- 这是经纪人可以从生产者处收到的最大消息大小.max.message.bytes- 这是代理允许附加到主题的消息的最大大小.此大小经过预压缩验证.(默认为经纪人message.max.bytes.)我发现了关于2号的困难方法 - 你没有从Kafka获得任何异常,消息或警告,所以在发送大量消息时一定要考虑这一点.
Sas*_*ter 45
与Laugh_man的答案相比,Kafka 0.10和新消费者需要进行微小的更改:
message.max.bytes和replica.fetch.max.bytes.message.max.bytes必须等于或小于(*)replica.fetch.max.bytes.max.request.size发送更大的消息.max.partition.fetch.bytes以接收更大的消息.(*)阅读评论以了解有关message.max.bytes<=的更多信息replica.fetch.max.bytes
Mic*_*Ckr 13
I think, most of the answers here are kind of outdated or not entirely complete.
To refer on the answer of Sacha Vetter (with the update for Kafka 0.10), I'd like to provide some additional Information and links to the official documentation.
Producer Configuration:
max.request.size (Link) has to be increased for files bigger than 1 MB, otherwise they are rejectedBroker/Topic configuration:
message.max.bytes (Link) may be set, if one like to increase the message size on broker level. But, from the documentation: "This can be set per topic with the topic level max.message.bytes config."max.message.bytes (Link) may be increased, if only one topic should be able to accept lager files. The broker configuration must not be changed.I'd always prefer a topic-restricted configuration, due to the fact, that I can configure the topic by myself as a client for the Kafka cluster (e.g. with the admin client). I may not have any influence on the broker configuration itself.
In the answers from above, some more configurations are mentioned as necessary:
replica.fetch.max.bytes (Link) (Broker config)From the documentation: "This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made."
max.partition.fetch.bytes (Link) (Consumer config)From the documentation: "Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress."
fetch.max.bytes (Link) (Consumer config; not mentioned above, but same category)From the documentation: "Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress."
结论:关于获取消息的配置无需更改即可处理消息,大于这些配置的默认值(已在小型设置中进行过测试)。也许,消费者可能总是获得大小为 1 的批次。但是,必须设置第一个块中的两个配置,如前面的答案中提到的。
此说明不应说明有关性能的任何信息,也不应建议设置或不设置这些配置。必须根据具体计划的吞吐量和数据结构单独评估最佳值。
use*_*587 11
您需要覆盖以下属性:
Broker Configs($ KAFKA_HOME/config/server.properties)
消费者配置($ KAFKA_HOME/config/consumer.properties)
此步骤对我不起作用.我将它添加到消费者应用程序,它工作正常
重启服务器.
请查看此文档以获取更多信息:http: //kafka.apache.org/08/configuration.html
小智 9
这个想法是将相同大小的消息从Kafka Producer发送到Kafka Broker然后由Kafka Consumer接收,即
Kafka生产商 - > Kafka Broker - > Kafka Consumer
假设要求是发送15MB的消息,那么生产者,经纪人和消费者这三者都需要同步.
Kafka Producer发送15 MB - > Kafka Broker允许/商店15 MB - > Kafka Consumer获得15 MB
因此,设置应为A.)On Broker:message.max.bytes = 15728640 replica.fetch.max.bytes = 15728640
B.)On Consumer:fetch.message.max.bytes = 15728640
要记住,message.max.bytes属性必须与消费者的财产同步fetch.message.max.bytes.获取大小必须至少与最大消息大小一样大,否则可能存在生成器可以发送大于消费者可以使用/获取的消息的情况.值得一看的是它.
您使用的是哪个版本的Kafka?还提供了一些您将获得的更多详细信息跟踪.有什么东西像...... payload size of xxxx larger
than 1000000出现在日志里吗?
@laughing_man的答案非常准确。但是,我仍然想提出建议,这是我从Quora的Kafka专家Stephane Maarek那里学到的。
Kafka并非要处理大型邮件。
您的API应该使用云存储(Ex AWS S3),并且只需向Kafka或任何消息代理推送S3的引用即可。您必须找到某个地方来保存数据,也许是网络驱动器,也许是任何东西,但它不应该是消息代理。
现在,如果您不想采用上述解决方案
消息的最大大小为1MB(代理中的设置称为message.max.bytes)Apache Kafka。如果确实非常需要它,则可以增加该大小,并确保为生产者和消费者增加网络缓冲区。
而且,如果您真的很想拆分邮件,请确保每个拆分的邮件都具有完全相同的密钥,以便将其推送到同一分区,并且邮件内容应报告“部件ID”,以便您的使用者可以完全重建邮件。
如果您的消息是基于文本的(gzip,snappy,lz4压缩),则还可以探索压缩,这可能会减小数据大小,但并非神奇。
同样,您必须使用外部系统来存储该数据,然后仅将外部引用推送到Kafka。这是一种非常常见的体系结构,您应该使用并被广泛接受的体系结构。
请记住,仅当邮件数量巨大而不是大小时,Kafka才能发挥最佳作用。
资料来源:https : //www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
| 归档时间: |
|
| 查看次数: |
99864 次 |
| 最近记录: |