我们正在运行 kafka hdfs sink 连接器(版本 5.2.1)并且需要通过多个嵌套字段对 HDFS 数据进行分区。主题中的数据存储为 Avro 并具有嵌套元素。如何连接无法识别嵌套字段并引发错误无法找到该字段。以下是我们正在使用的连接器配置。hdfs sink connect 不支持通过嵌套字段进行分区吗?我可以使用非嵌套字段进行分区
{
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"topics.dir": "/projects/test/kafka/logdata/coss",
"avro.codec": "snappy",
"flush.size": "200",
"connect.hdfs.principal": "test@DOMAIN.COM",
"rotate.interval.ms": "500000",
"logs.dir": "/projects/test/kafka/tmp/wal/coss4",
"hdfs.namenode.principal": "hdfs/_HOST@HADOOP.DOMAIN",
"hadoop.conf.dir": "/etc/hdfs",
"topics": "test1",
"connect.hdfs.keytab": "/etc/hdfs-qa/test.keytab",
"hdfs.url": "hdfs://nameservice1:8020",
"hdfs.authentication.kerberos": "true",
"name": "hdfs_connector_v1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://myschema:8081",
"partition.field.name": "meta.ID,meta.source,meta.HH",
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner"
}
Run Code Online (Sandbox Code Playgroud) 我订阅了一个 Kafka 主题,如下所示。只有在为消费者分配了分区后,我才需要运行一些逻辑。
然而,consumer.assignment()无论我等多久,它都会作为一个空集返回。如果我没有 while 循环,然后执行 a consumer.poll()我确实从主题中获取记录。谁能告诉我为什么会发生这种情况?
consumer.subscribe(topics);
Set<TopicPartition> assigned=Collections.emptySet();
while(isAssigned) {
assigned = consumer.assignment();
if(!assigned.isEmpty()) {
isAssigned= false;
}
}
//consumer props
Properties props = new Properties();
props.put("bootstrap.servers", "xxx:9092,yyy:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://xxx:8081");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("max.poll.records", "100");
Run Code Online (Sandbox Code Playgroud)