rap*_*ael 4 out-of-memory amazon-s3 apache-kafka-connect
我目前正在使用Kafka Connect S3接收器连接器3.3.1将Kafka消息复制到S3,处理后期数据时出现OutOfMemory错误。
我知道这似乎是一个很长的问题,但我已尽力使其清晰易懂。非常感谢您的帮助。
CustomByteArrayFormat类的角色(请参阅下面的配置)Record时间戳进行
分区和存储CustomTimeBasedPartitioner扩展io.confluent.connect.storage.partitioner.TimeBasedPartitioner,其唯一目的是重写generatePartitionedPath将主题放在路径末尾的方法。timestamp.extractor连接器的配置设置为Record发生这些OOM错误时Wallclock(例如,Kafka Connect进程的时间)请勿抛出OOM错误,并且可以处理所有最新数据,但是不再正确存储最新数据
YYYY/MM/dd/HH/mm/topic-name重新打开连接器的时间内进行存储Record时间戳正确存储数据时,它执行的并行读取次数过多,导致OOM错误
"partition.duration.ms": "600000"参数使连接器存储区数据以每小时6个10分钟的路径进行传输(2018/06/20/12/[00|10|20|30|40|50]适用于2018-06-20 12pm)24h * 6 = 144不同的S3路径中输出数据。"flush.size": "100000"意味着如果有更多的dans 100,000条消息被读取,则应将它们提交到文件中(从而释放内存)
"flush.size"读取记录的数量每个分区提交之前?还是每个连接器的任务?"rotate.schedule.interval.ms": "600000"config 的方式是,即使flush.size尚未达到100,000条消息,数据也将每10分钟提交一次。我的主要问题是,给定以下哪些数学运算可以让我计划内存使用量:
"partition.duration.ms": "600000"配置,此处为6 )S3接收器连接器配置
{
"name": "xxxxxxx",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "us-east-1",
"partition.duration.ms": "600000",
"topics.dir": "xxxxx",
"flush.size": "100000",
"schema.compatibility": "NONE",
"topics": "xxxxxx,xxxxxx",
"tasks.max": "16",
"s3.part.size": "52428800",
"timezone": "UTC",
"locale": "en",
"format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
"partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "xxxxxxxxx",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "xxxxxxx",
"rotate.schedule.interval.ms": "600000",
"path.format": "YYYY/MM/dd/HH/mm",
"timestamp.extractor": "Record"
}
Run Code Online (Sandbox Code Playgroud)
工作人员配置
bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX
Run Code Online (Sandbox Code Playgroud)
编辑:
我忘了添加我遇到的错误的示例:
2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我终于能够理解Kafka Connect S3连接器中堆大小用法的工作原理
paths
paths分区的方式取决于partitioner.class参数。partition.duration.ms将确定每个partitioned的持续时间paths。s3.part.size每个Kafka分区(针对所有读取的主题)和每个分区分配Bytes 缓冲区。paths
timestamp.extractor设置为Record,partition.duration.ms设置为1h,s3.part.size设置为50 MB
20 * 50 MB= 1 GB;timestamp.extractor如果将其设置为Record,则具有时间戳的消息对应于早于该时间的消息,而该消息的读取时间早于该消息的时间,则该消息将被缓存在此早的缓冲区中。因此,实际上,连接器将需要最少20 * 50 MB * 2h= 2 GB的内存,因为总会出现延迟的事件,而如果事件的延迟超过1小时则更多;timestamp.extractor将设置为,则这是不正确的,Wallclock因为就Kafka Connect而言,几乎不会发生任何迟到的事件。rotate.schedule.interval.ms 时间过去了
rotate.interval.ms时间已经过去了来讲timestamp.extractor时间
timestamp.extractor将设置为Record,则10分钟的Record时间可以经过更少或更多,而10分钟的实际时间可以通过
rotate.interval.ms将其设置为10分钟,则此条件将每秒触发一次(应如此);rotate.interval.ms该条件自上次触发以来已经过去了。flush.size读取的邮件少于min(rotate.schedule.interval.ms,rotate.interval.ms)
rotate.interval.ms如果没有足够的消息,则此条件可能永远不会触发。Kafka partitions * s3.part.size至少需要计划堆大小
Record时间戳进行分区,则应将其乘以max lateness in milliseconds / partition.duration.ms
max lateness in milliseconds。consumer.max.partition.fetch.bytes当从Kafka读取数据时,S3连接器还将为每个分区缓冲字节
| 归档时间: |
|
| 查看次数: |
743 次 |
| 最近记录: |