Kafka Connect S3 Connector OutOfMemory错误与TimeBasedPartitioner

rap*_*ael 4 out-of-memory amazon-s3 apache-kafka-connect

我目前正在使用Kafka Connect S3接收器连接器3.3.1将Kafka消息复制到S3,处理后期数据时出现OutOfMemory错误。

我知道这似乎是一个很长的问题,但我已尽力使其清晰易懂。非常感谢您的帮助。

高级信息

  • 连接器对Kafka消息进行简单的字节到字节的复制,并在字节数组的开头添加消息的长度(出于解压缩目的)。
    • 这是CustomByteArrayFormat类的角色(请参阅下面的配置)
  • 数据根据Record时间戳进行 分区和存储
    • CustomTimeBasedPartitioner扩展io.confluent.connect.storage.partitioner.TimeBasedPartitioner,其唯一目的是重写generatePartitionedPath将主题放在路径末尾的方法。
  • Kafka Connect进程的总堆大小为24GB(仅一个节点)
  • 连接器每秒处理8,000至10,000条消息
  • 每封邮件的大小接近1 KB
  • Kafka主题有32个分区

内存不足错误的上下文

  • 这些错误仅在连接器关闭数小时并必须赶上数据时才会发生
  • 重新打开连接器时,它开始追赶,但由于OutOfMemory错误而很快失败

可能但不完整的解释

  • timestamp.extractor连接器的配置设置为Record发生这些OOM错误时
  • 将此配置切换为Wallclock(例如,Kafka Connect进程的时间)请勿抛出OOM错误,并且可以处理所有最新数据,但是不再正确存储最新数据
    • 所有的最新数据将在YYYY/MM/dd/HH/mm/topic-name重新打开连接器的时间内进行存储
  • 所以我的猜测是,当连接器尝试根据Record时间戳正确存储数据时,它执行的并行读取次数过多,导致OOM错误
    • "partition.duration.ms": "600000"参数使连接器存储区数据以每小时6个10分钟的路径进行传输(2018/06/20/12/[00|10|20|30|40|50]适用于2018-06-20 12pm)
    • 因此,对于24小时的延迟数据,连接器将不得不在24h * 6 = 144不同的S3路径中输出数据。
    • 每个10分钟的文件夹包含10,000条消息/秒* 600秒= 6,000,000条消息,大小为6 GB
    • 如果确实并行读取,则将使864GB数据进入内存
  • 我认为我必须正确配置一组给定的参数以避免这些OOM错误,但是我不觉得我看到了大图
    • "flush.size": "100000"意味着如果有更多的dans 100,000条消息被读取,则应将它们提交到文件中(从而释放内存)
      • 对于1KB的消息,这意味着每100MB提交一次
      • 但是,即使有144个并行读取,也仍然只能提供14.4 GB的总空间,这比可用的堆大小少24GB
      • "flush.size"读取记录的数量每个分区提交之前?还是每个连接器的任务
    • 我理解"rotate.schedule.interval.ms": "600000"config 的方式是,即使flush.size尚未达到100,000条消息,数据也将每10分钟提交一次。

我的主要问题是,给定以下哪些数学运算可以让我计划内存使用量:

  • 每秒的数量或记录
  • 记录的大小
  • 我从中读取的主题的Kafka分区数
  • 连接器任务的数量(如果相关)
  • 每小时写入的存储桶数(由于"partition.duration.ms": "600000"配置,此处为6 )
  • 延迟数据处理的最大小时数

构型

S3接收器连接器配置

{
  "name": "xxxxxxx",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "us-east-1",
    "partition.duration.ms": "600000",
    "topics.dir": "xxxxx",
    "flush.size": "100000",
    "schema.compatibility": "NONE",
    "topics": "xxxxxx,xxxxxx",
    "tasks.max": "16",
    "s3.part.size": "52428800",
    "timezone": "UTC",
    "locale": "en",
    "format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
    "partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "name": "xxxxxxxxx",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "xxxxxxx",
    "rotate.schedule.interval.ms": "600000",
    "path.format": "YYYY/MM/dd/HH/mm",
    "timestamp.extractor": "Record"
}
Run Code Online (Sandbox Code Playgroud)

工作人员配置

bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX
Run Code Online (Sandbox Code Playgroud)

编辑

我忘了添加我遇到的错误的示例:

2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

rap*_*ael 7

我终于能够理解Kafka Connect S3连接器中堆大小用法的工作原理

  • S3连接器会将每个Kafka分区的数据写入分区 paths
    • 这些paths分区的方式取决于partitioner.class参数。
    • 默认情况下,它是通过时间戳记的,然后值partition.duration.ms将确定每个partitioned的持续时间paths
  • S3连接器将为s3.part.size每个Kafka分区(针对所有读取的主题)和每个分区分配Bytes 缓冲区。paths
    • 读取20个分区的示例,timestamp.extractor设置为Recordpartition.duration.ms设置为1h,s3.part.size设置为50 MB
      • 每小时所需的堆大小等于20 * 50 MB= 1 GB;
      • 但是,timestamp.extractor如果将其设置为Record,则具有时间戳的消息对应于早于该时间的消息,而该消息的读取时间早于该消息的时间,则该消息将被缓存在此早的缓冲区中。因此,实际上,连接器将需要最少20 * 50 MB * 2h= 2 GB的内存,因为总会出现延迟的事件,而如果事件的延迟超过1小时则更多;
      • 请注意,如果timestamp.extractor将设置为,则这是不正确的,Wallclock因为就Kafka Connect而言,几乎不会发生任何迟到的事件。
    • 这些缓冲区在3种情况下被刷新(即离开内存)
      • rotate.schedule.interval.ms 时间过去了
        • 总是触发此冲洗条件。
      • rotate.interval.ms时间已经过去了来讲timestamp.extractor时间
        • 这意味着如果timestamp.extractor将设置为Record,则10分钟的Record时间可以经过更少或更多,而10分钟的实际时间可以通过
          • 例如,在处理较晚的数据时,将在几秒钟内处理价值10分钟的数据,如果rotate.interval.ms将其设置为10分钟,则此条件将每秒触发一次(应如此);
          • 相反,如果事件流中有暂停,则该条件将不会触发,直到它看到带有时间戳的事件,表明rotate.interval.ms该条件自上次触发以来已经过去了。
      • flush.size读取的邮件少于min(rotate.schedule.interval.msrotate.interval.ms)
        • 至于,rotate.interval.ms如果没有足够的消息,则此条件可能永远不会触发。
    • 因此,您Kafka partitions * s3.part.size至少需要计划堆大小
      • 如果您使用Record时间戳进行分区,则应将其乘以max lateness in milliseconds / partition.duration.ms
        • 在最坏的情况下,您在所有分区和的所有范围中都有持续不断的延迟事件max lateness in milliseconds
  • consumer.max.partition.fetch.bytes当从Kafka读取数据时,S3连接器还将为每个分区缓冲字节
    • 默认情况下,此设置为2.1 MB。
  • 最后,您不应该考虑所有的堆大小都可用于缓冲Kafka消息,因为其中还有很多不同的对象
    • 一个安全的考虑是要确保Kafka消息的缓冲不会超过可用堆总大小的50%。

  • 谢谢吉多,非常感谢。我很高兴它很有用。 (2认同)