Dan*_*iel 5 amazon-s3 apache-kafka confluent apache-kafka-connect
我正在尝试使用Confluent S3接收器的TimeBasedPartitioner.这是我的配置:
{
"name":"s3-sink",
"config":{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"1",
"file":"test.sink.txt",
"topics":"xxxxx",
"s3.region":"yyyyyy",
"s3.bucket.name":"zzzzzzz",
"s3.part.size":"5242880",
"flush.size":"1000",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor":"Record",
"timestamp.field":"local_timestamp",
"path.format":"YYYY-MM-dd-HH",
"partition.duration.ms":"3600000",
"schema.compatibility":"NONE"
}
Run Code Online (Sandbox Code Playgroud)
}
数据是二进制的,我使用avro方案.我想使用实际记录字段"local_timestamp",它是一个UNIX时间戳来对数据进行分区,比如分成小时文件.
我使用通常的REST API调用启动连接器
curl -X POST -H "Content-Type: application/json" --data @s3-config.json http://localhost:8083/connectors
Run Code Online (Sandbox Code Playgroud)
不幸的是,数据没有按照我的意愿进行分区.我也试图删除刷新大小,因为这可能会干扰.但后来我得到了错误
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nMissing required configuration \"flush.size\" which has no default value.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}%
Run Code Online (Sandbox Code Playgroud)
知道如何正确设置TimeBasedPartioner吗?我找不到一个有效的例子.
另外,如何调试这样的问题或进一步了解连接器实际上在做什么?
非常感谢任何帮助或进一步的建议.
在研究了TimeBasedPartitioner.java上的代码和日志后
confluent log connect tail -f
Run Code Online (Sandbox Code Playgroud)
我意识到时区和语言环境都是必需的,尽管在Confluent S3 Connector文档中没有这样说明.以下配置字段解决了问题,让我上传正确分区到S3存储桶的记录:
"flush.size": "10000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timezone": "UTC",
"partition.duration.ms": "3600000",
"timestamp.extractor": "RecordField",
"timestamp.field": "local_timestamp",
Run Code Online (Sandbox Code Playgroud)
还要注意两件事:首先,flush.size的值也是必需的,文件最终被分区为更小的块,不大于flush.size指定的块.其次,如上所示更好地选择path.format,从而生成适当的树结构.
我仍然不能100%确定是否真的使用记录字段local_timestamp来对记录进行分区.
任何评论或改进都非常受欢迎.
事实上,您修改后的配置似乎是正确的。
具体来说,设置timestamp.extractor为RecordField允许您根据记录具有的时间戳字段对文件进行分区,并且您通过设置 属性 来识别该时间戳字段timestamp.field。
当设置一组时timestamp.extractor=Record,基于时间的分区器将为每条记录使用 Kafka 时间戳。
关于flush.size,将此属性设置为较高值(例如Integer.MAX_VALUE)实际上等同于忽略它。
最后,schema.generator.class在最新版本的连接器中不再需要。
| 归档时间: |
|
| 查看次数: |
3493 次 |
| 最近记录: |