在Kafka连接器中设置分区策略

Rai*_*Dan 5 java apache-kafka apache-kafka-connect

我正在使用自定义的Kafka连接器(使用Kafka Connect的Java API用Java编写)从外部源提取数据并存储在主题中。我需要设置自定义分区策略。我了解可以通过设置属性在Kafka Producer中设置自定义分区程序。但是,此属性对于Kafka连接器似乎没有任何作用。如何配置Kafka Connect(我正在使用脚本来运行连接器)以使用编写的自定义?partitioner.classconnect-standalonePartitioner

Ran*_*uch 5

源连接器可以控制到每个源记录通过写入分区SourceRecordpartition字段。如果这是您自己的连接器,则这是最直接的。

但是,如果要更改源连接器对每个记录进行分区的方式,可以使用覆盖partition源记录字段的单消息转换(SMT)。您可能必须通过实现org.apache.kafka.connect.transforms.Transformation和使用自己的分区逻辑来编写自定义SMT ,但这实际上比编写自定义Kafka分区程序要容易一些。

例如,这是一个概念上的自定义转换,该转换显示了如何使用配置属性以及如何SourceRecord使用所需的分区号创建新实例。该示例是不完整的,因为它实际上没有任何真正的分区逻辑,但这应该是一个很好的起点。

包io.acme.example;

导入org.apache.kafka.common.config.AbstractConfig;
导入org.apache.kafka.common.config.ConfigDef;
导入org.apache.kafka.common.config.ConfigDef.Importance;
导入org.apache.kafka.common.config.ConfigDef.Type;
导入org.apache.kafka.connect.source.SourceRecord;
导入org.apache.kafka.connect.transforms.Transformation;

导入java.util.Map;

公共类CustomPartitioner实现了Transformation {

  私有静态最终字符串MAX_PARTITIONS_CONFIG =“ max.partitions”;
  private static final String MAX_PARTITIONS_DOC =“最大分区数”;
  私有静态最终整数MAX_PARTITIONS_DEFAULT = 1;

  / ** 
   *配置的定义。我们只在这里定义一个配置属性,
   *但是您可以将多个“定义”方法链接在一起。复杂的配置可能需要保证
   *将所有与配置相关的内容拉入扩展{@link AbstractConfig}的单独类中
   *并添加辅助方法(例如“ getMaxPartitions()”),然后您将使用此类来解析
   * {@link #configure(Map)}中的参数,而不是{@link AbstractConfig}中的参数。
   * /
  私有静态最终ConfigDef CONFIG_DEF = new ConfigDef()。define(MAX_PARTITIONS_CONFIG,Type.INT,MAX_PARTITIONS_DEFAULT,Importance.HIGH,MAX_PARTITIONS_DOC)​​;

  private int maxPartitions;

  @Override
  public void configure(地图配置){
    //将任何配置参数存储为字段...
    AbstractConfig config =新的AbstractConfig(CONFIG_DEF,configs);
    maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG);
  }

  @Override
  public SourceRecord apply(SourceRecord record){
    //在此处计算所需的分区
    int actualPartition = record.kafkaPartition();
    int wantedPartition = ...
    //然后使用除新分区外的所有现有字段创建新记录...
    返回record.newRecord(record.topic(),desiredPartition,
                            record.keySchema(),record.key(),
                            record.valueSchema(),record.value(),
                            record.timestamp());
  }

  @Override
  公共ConfigDef config(){
    返回CONFIG_DEF;
  }

  @Override
  公共无效close(){
    // 没做什么
  }
}

ConfigDefAbstractConfig功能是非常有用的,可以做很多更有趣的事情,包括使用自定义验证和推荐者,以及具有依赖于其他属性配置属性。如果您想了解更多有关此的信息,请查看一些使用相同框架的现有Kafka Connect连接器。

最后一件事。当运行Kafka Connect独立或分布式工作程序时,但请确保将CLASSPATH环境变量设置为指向包含自定义SMT的JAR文件以及SMT所依赖的JAR文件, Kafka提供的文件除外。该connect-standalone.shconnect-distributed.sh命令会自动添加卡夫卡的JAR文件到类路径中。