Rai*_*Dan 5 java apache-kafka apache-kafka-connect
我正在使用自定义的Kafka连接器(使用Kafka Connect的Java API用Java编写)从外部源提取数据并存储在主题中。我需要设置自定义分区策略。我了解可以通过设置属性在Kafka Producer中设置自定义分区程序。但是,此属性对于Kafka连接器似乎没有任何作用。如何配置Kafka Connect(我正在使用脚本来运行连接器)以使用编写的自定义?partitioner.classconnect-standalonePartitioner
源连接器可以控制到每个源记录通过写入分区SourceRecord的partition字段。如果这是您自己的连接器,则这是最直接的。
但是,如果要更改源连接器对每个记录进行分区的方式,可以使用覆盖partition源记录字段的单消息转换(SMT)。您可能必须通过实现org.apache.kafka.connect.transforms.Transformation和使用自己的分区逻辑来编写自定义SMT ,但这实际上比编写自定义Kafka分区程序要容易一些。
例如,这是一个概念上的自定义转换,该转换显示了如何使用配置属性以及如何SourceRecord使用所需的分区号创建新实例。该示例是不完整的,因为它实际上没有任何真正的分区逻辑,但这应该是一个很好的起点。
包io.acme.example;
导入org.apache.kafka.common.config.AbstractConfig;
导入org.apache.kafka.common.config.ConfigDef;
导入org.apache.kafka.common.config.ConfigDef.Importance;
导入org.apache.kafka.common.config.ConfigDef.Type;
导入org.apache.kafka.connect.source.SourceRecord;
导入org.apache.kafka.connect.transforms.Transformation;
导入java.util.Map;
公共类CustomPartitioner实现了Transformation {
私有静态最终字符串MAX_PARTITIONS_CONFIG =“ max.partitions”;
private static final String MAX_PARTITIONS_DOC =“最大分区数”;
私有静态最终整数MAX_PARTITIONS_DEFAULT = 1;
/ **
*配置的定义。我们只在这里定义一个配置属性,
*但是您可以将多个“定义”方法链接在一起。复杂的配置可能需要保证
*将所有与配置相关的内容拉入扩展{@link AbstractConfig}的单独类中
*并添加辅助方法(例如“ getMaxPartitions()”),然后您将使用此类来解析
* {@link #configure(Map)}中的参数,而不是{@link AbstractConfig}中的参数。
* /
私有静态最终ConfigDef CONFIG_DEF = new ConfigDef()。define(MAX_PARTITIONS_CONFIG,Type.INT,MAX_PARTITIONS_DEFAULT,Importance.HIGH,MAX_PARTITIONS_DOC);
private int maxPartitions;
@Override
public void configure(地图配置){
//将任何配置参数存储为字段...
AbstractConfig config =新的AbstractConfig(CONFIG_DEF,configs);
maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG);
}
@Override
public SourceRecord apply(SourceRecord record){
//在此处计算所需的分区
int actualPartition = record.kafkaPartition();
int wantedPartition = ...
//然后使用除新分区外的所有现有字段创建新记录...
返回record.newRecord(record.topic(),desiredPartition,
record.keySchema(),record.key(),
record.valueSchema(),record.value(),
record.timestamp());
}
@Override
公共ConfigDef config(){
返回CONFIG_DEF;
}
@Override
公共无效close(){
// 没做什么
}
}
该ConfigDef和AbstractConfig功能是非常有用的,可以做很多更有趣的事情,包括使用自定义验证和推荐者,以及具有依赖于其他属性配置属性。如果您想了解更多有关此的信息,请查看一些使用相同框架的现有Kafka Connect连接器。
最后一件事。当运行Kafka Connect独立或分布式工作程序时,但请确保将CLASSPATH环境变量设置为指向包含自定义SMT的JAR文件以及SMT所依赖的JAR文件,但 Kafka提供的文件除外。该connect-standalone.sh和connect-distributed.sh命令会自动添加卡夫卡的JAR文件到类路径中。
| 归档时间: |
|
| 查看次数: |
2282 次 |
| 最近记录: |