如何使用Kafka(超过15MB)发送大量邮件?

Son*_*123 95 java apache-kafka

我使用Java Producer API将String-messages发送到Kafka V. 0.8.如果邮件大小约为15 MB,我会得到一个MessageSizeTooLargeException.我试图设置message.max.bytes为40 MB,但我仍然得到例外.小消息没有问题.

(例外情况出现在制作人中,我在此应用程序中没有使用者.)

我该怎么做才能摆脱这种异常?

我的示例生产者配置

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}
Run Code Online (Sandbox Code Playgroud)

错误日志:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Run Code Online (Sandbox Code Playgroud)

lau*_*man 158

您需要调整三(或四)个属性:

  • 消费者方面:fetch.message.max.bytes- 这将确定消费者可以获取的消息的最大大小.
  • 代理方:replica.fetch.max.bytes- 这将允许代理中的副本在群集内发送消息,并确保正确复制消息.如果这个太小,则永远不会复制该消息,因此,消费者永远不会看到该消息,因为该消息永远不会被提交(完全复制).
  • 经纪人方面:message.max.bytes- 这是经纪人可以从生产者处收到的最大消息大小.
  • 代理方(每个主题):max.message.bytes- 这是代理允许附加到主题的消息的最大大小.此大小经过预压缩验证.(默认为经纪人message.max.bytes.)

我发现了关于2号的困难方法 - 你没有从Kafka获得任何异常,消息或警告,所以在发送大量消息时一定要考虑这一点.

  • 是.在消费者方面,为每个分区分配`fetch.message.max.bytes`内存.这意味着如果你为`fetch.message.max.bytes`使用一个庞大的数字并结合大量的分区,它将消耗大量的内存.实际上,由于代理之间的复制过程也是一个专门的消费者,这也会占用代理的内存. (7认同)
  • 好的,你和user2720864是正确的.我只在源代码中设置了`message.max.bytes`.但我必须在Kafka服务器`config/server.properties`的配置中设置这些值.现在还有更大的消息工作:). (3认同)
  • 是否存在设置这些值太高的已知缺点? (3认同)
  • 注意,每个主题*还有一个`max.message.bytes`配置*,它可以低于代理的`message.max.bytes`. (3认同)
  • 根据官方文档,消费者方面的参数和有关代理之间复制的参数`/.*fetch.*bytes/` 似乎不是硬限制:“这不是绝对最大值,如果 [... ] 大于此值,仍将返回记录批次以确保可以取得进展。” (2认同)

Sas*_*ter 45

Laugh_man的答案相比,Kafka 0.10新消费者需要进行微小的更改:

  • 经纪人:没有变化,你仍然需要增加属性message.max.bytesreplica.fetch.max.bytes.message.max.bytes必须等于或小于(*)replica.fetch.max.bytes.
  • 制作人:增加max.request.size发送更大的消息.
  • 消费者:增加max.partition.fetch.bytes以接收更大的消息.

(*)阅读评论以了解有关message.max.bytes<=的更多信息replica.fetch.max.bytes

  • 您知道为什么“ message.max.bytes”必须小于“ replica.fetch.max.bytes”吗? (2认同)
  • “ ** replica.fetch.max.bytes **(默认值:1MB)–代理可以复制的最大数据大小。此大小必须大于” message.max.bytes **”,否则代理将接受消息并且无法复制它们。从而导致潜在的数据丢失。” 资料来源:[handling-large-messages-kafka](http://ingest.tips/2015/01/21/handling-large-messages-kafka/) (2认同)
  • 感谢您通过链接回复我。这似乎也呼应了 [Cloudera 指南](https://www.cloudera.com/documentation/kafka/latest/topics/kafka_performance.html) 的建议。然而,这两个都是错误的 - 请注意,它们没有提供任何技术原因来说明_为什么_`replica.fetch.max.bytes`应该严格大于`message.max.bytes`。Confluent 员工 [今天早些时候确认](https://lists.apache.org/thread.html/dc9c636dd0b4b1bfe6a0310b6e550ebf71ef6b197910ab40d7500bca@%3Cusers.kafka.apache.org%3E),我可以怀疑的两个数量:平等的。 (2认同)
  • 关于`message.max.bytes &lt;replica.fetch.max.bytes`或`message.max.bytes = replica.fetch.max.bytes` @Kostas是否有任何更新? (2认同)
  • 是的,它们可以相等:https://www.mail-archive.com/users@kafka.apache.org/msg25494.html(Ismael 为 Confluent 工作) (2认同)

Mic*_*Ckr 13

I think, most of the answers here are kind of outdated or not entirely complete.

To refer on the answer of Sacha Vetter (with the update for Kafka 0.10), I'd like to provide some additional Information and links to the official documentation.


Producer Configuration:

Broker/Topic configuration:

  • message.max.bytes (Link) may be set, if one like to increase the message size on broker level. But, from the documentation: "This can be set per topic with the topic level max.message.bytes config."
  • max.message.bytes (Link) may be increased, if only one topic should be able to accept lager files. The broker configuration must not be changed.

I'd always prefer a topic-restricted configuration, due to the fact, that I can configure the topic by myself as a client for the Kafka cluster (e.g. with the admin client). I may not have any influence on the broker configuration itself.


In the answers from above, some more configurations are mentioned as necessary:

From the documentation: "This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made."

From the documentation: "Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress."

From the documentation: "Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress."


结论:关于获取消息的配置无需更改即可处理消息,大于这些配置的默认值(已在小型设置中进行过测试)。也许,消费者可能总是获得大小为 1 的批次。但是,必须设置第一个块中的两个配置,如前面的答案中提到的。

此说明不应说明有关性能的任何信息,也不应建议设置或不设置这些配置。必须根据具体计划的吞吐量和数据结构单独评估最佳值。


use*_*587 11

您需要覆盖以下属性:

Broker Configs($ KAFKA_HOME/config/server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

消费者配置($ KAFKA_HOME/config/consumer.properties)
此步骤对我不起作用.我将它添加到消费者应用程序,它工作正常

  • fetch.message.max.bytes

重启服务器.

请查看此文档以获取更多信息:http: //kafka.apache.org/08/configuration.html


小智 9

这个想法是将相同大小的消息从Kafka Producer发送到Kafka Broker然后由Kafka Consumer接收,即

Kafka生产商 - > Kafka Broker - > Kafka Consumer

假设要求是发送15MB的消息,那么生产者,经纪人和消费者这三者都需要同步.

Kafka Producer发送15 MB - > Kafka Broker允许/商店15 MB - > Kafka Consumer获得15 MB

因此,设置应为A.)On Broker:message.max.bytes = 15728640 replica.fetch.max.bytes = 15728640

B.)On Consumer:fetch.message.max.bytes = 15728640

  • 是不是fetch.message.max.bytes被ConsumerConfig中的max.partition.fetch.bytes取代了? (2认同)

use*_*864 7

要记住,message.max.bytes属性必须与消费者的财产同步fetch.message.max.bytes.获取大小必须至少与最大消息大小一样大,否则可能存在生成器可以发送大于消费者可以使用/获取的消息的情况.值得一看的是它.
您使用的是哪个版本的Kafka?还提供了一些您将获得的更多详细信息跟踪.有什么东西像...... payload size of xxxx larger than 1000000出现在日志里吗?


Bha*_*ala 5

@laughing_man的答案非常准确。但是,我仍然想提出建议,这是我从Quora的Kafka专家Stephane Maarek那里学到的。

Kafka并非要处理大型邮件。

您的API应该使用云存储(Ex AWS S3),并且只需向Kafka或任何消息代理推送S3的引用即可。您必须找到某个地方来保存数据,也许是网络驱动器,也许是任何东西,但它不应该是消息代理。

现在,如果您不想采用上述解决方案

消息的最大大小为1MB(代理中的设置称为message.max.bytesApache Kafka。如果确实非常需要它,则可以增加该大小,并确保为生产者和消费者增加网络缓冲区。

而且,如果您真的很想拆分邮件,请确保每个拆分的邮件都具有完全相同的密钥,以便将其推送到同一分区,并且邮件内容应报告“部件ID”,以便您的使用者可以完全重建邮件。

如果您的消息是基于文本的(gzip,snappy,lz4压缩),则还可以探索压缩,这可能会减小数据大小,但并非神奇。

同样,您必须使用外部系统来存储该数据,然后仅将外部引用推送到Kafka。这是一种非常常见的体系结构,您应该使用并被广泛接受的体系结构。

请记住,仅当邮件数量巨大而不是大小时,Kafka才能发挥最佳作用。

资料来源:https : //www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

  • 卡夫卡处理大消息,绝对没有问题。Kafka 主页上的介绍页面甚至将其称为存储系统。 (3认同)
  • 您可能要注意,“您的”建议几乎是斯特凡·马雷克的Quora建议的逐字复本,网址为https://www.quora.com/How-do-I-send-Large-messages-80-MB卡夫卡 (2认同)