标签: apache-kafka-connect

在 Kubernetes 上安装自定义连接器到 Kafka Connect

我正在运行 kafka kubenetes helm 部署,但是我不确定如何安装自定义插件。

在本地版本的 kafka 上运行自定义插件时,我将卷安装/myplugin到 Docker 映像,然后设置插件路径环境变量。

我不确定如何将此工作流程应用于 helm Charts/kubernetes 部署,主要是如何将插件安装到 Kafka Connect pod,以便可以在 default 中找到它plugin.path=/usr/share/java

apache-kafka kubernetes apache-kafka-connect

1
推荐指数
1
解决办法
2481
查看次数

kafka connect 和 kafka 主机要求

我正在使用 Couchbase 水槽连接器。CB 和 kafka 位于不同 AWS 区域的 2 个不同 EC2 实例中。我正在关注这些文档:

  1. https://docs.couchbase.com/kafka-connector/current/quickstart.html
  2. https://kafka.apache.org/documentation/#connect_configuring

基于这些,我认为 connect 必须在也安装了 kafka 的主机上运行。我的连接是否可以在远程主机上运行,​​以便我从远程 kafka 读取并将消息接收到远程 CB 存储桶中?有专门针对此的文档吗?

另外,我收到以下错误:

        at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
    at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.couchbase.connect.kafka.CouchbaseSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.3.0', encodedVersion=2.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.3.0', encodedVersion=2.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.3.0', encodedVersion=2.3.0, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.3.0', encodedVersion=2.3.0, type=sink, typeName='sink', …
Run Code Online (Sandbox Code Playgroud)

couchbase apache-kafka apache-kafka-connect

1
推荐指数
1
解决办法
479
查看次数

无法将主题映射到 kafka elasticsearch 连接器中的指定索引

尝试将主题“name:localtopic”映射到索引“name:indexoftopic”,它在弹性搜索“localtopic和indexoftopic”中创建两个新索引,并且主题数据仅在主题名称索引“localtopic”中可见,连接器中没有显示错误(分布式模式)

我的配置是

 "config" : {
  "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max" : "1",
  "topics" : "localtopic", 
  "topic.index.map" : "localtopic:indexoftopic",
  "connection.url" : "aws elasticsearch url",
  "type.name" : "event",
  "key.ignore" : "false",
  "schema.ignore" : "true",
  "schemas.enable" : "false",
  "transforms" : "InsertKey,extractKey",
  "transforms.InsertKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.InsertKey.fields" : "event-id",
  "transforms.extractKey.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field" : "event-id"
 }
Run Code Online (Sandbox Code Playgroud)

索引名称:indexoftopic是在elasticsearch中创建的,但数据是通过index_name:localtopic kafkaversion:2.3连接器版本:5 elasticsearch版本:3.2.0看到的

即使在日志 INFO --topics.regex = "" 中,我也不知道 ihis 选项,任何人都可以建议。怎么用这个???

elasticsearch apache-kafka apache-kafka-connect

1
推荐指数
1
解决办法
1969
查看次数

从 Rest API 发送数据到 kafka

我刚刚开始学习 Kafka,我正在尝试构建一个原型来拥有一个 REST API 生产者并将数据发送给 Kafka 消费者。我查阅了大量文档来找出一些特定的程序。

我无法理解是否有一个连接器可以像为 Apache Kafka 提供的文件连接器或 JDBC 连接器一样使用。我应该为此编写一个自定义连接器吗?

我很困惑从哪里开始。我特别寻找一些关于如何完成这项工作的结构化文档或想法。

apache-kafka kafka-producer-api apache-kafka-connect

1
推荐指数
1
解决办法
1万
查看次数

Kafka connect docker image - 无法找到任何实现 Connector 且名称与 ElasticsearchSinkConnector 匹配的类

我已经使用 kafka-connect 图像confluenceinc/cp-kafka-connect一段时间了。根据 Confluence 文档,这个 docker 镜像附带了预安装的连接器插件,包括 Elastic。

我以前一直使用5.4.1-ccs效果很好的版本,我可以添加弹性接收器连接器配置,它们工作得很好。但是我尝试更新 confluentinc/cp-kafka-connect到最新版本v6.0.1,但现在出现错误。

ConnectException: Failed to find any class that implements Connector and which name matches ElasticsearchSinkConnector

我在 Confluence 网站上阅读了很多文档,但有点零星。我知道问题是插件未安装,因为它们被从新的 docker 映像中删除或路径错误(不确定是哪一个)。

我该如何解决这个问题?(注意:我也编写了自己的java插件,所以两者都需要工作)

这是我docker-compose目前的文件(同样,这适用于 version 5.4.1-ccs

kafka-connect-node-1:
  image: confluentinc/cp-kafka-connect:5.4.1 #using old version because of breaking change
  hostname: kafka-connect-node-1
  ports:
    - '8083:8083'
  environment:
    CONNECT_BOOTSTRAP_SERVERS: [MY_SERVER]
    CONNECT_REST_PORT: 8083
    CONNECT_GROUP_ID: compose-connect-group
    CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
    CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
    CONNECT_STATUS_STORAGE_TOPIC: connect-status
    CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
    CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8084'
    CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
    CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8084'
    CONNECT_INTERNAL_KEY_CONVERTER: …
Run Code Online (Sandbox Code Playgroud)

apache-kafka docker apache-kafka-connect confluent-platform

1
推荐指数
1
解决办法
1806
查看次数

Kafka Connect 不输出 JSON

我正在使用 JDBC Kafka 连接器将数据从数据库读取到 Kafka。这有效,但它总是以 Avro 格式输出数据,即使我已经指定它应该使用 JSON。我知道这样做是因为当我在 python 中使用来自该主题的消息时,我会在每条消息的顶部看到模式。

我像这样运行连接器:

/usr/bin/connect-standalone /etc/schema-registry/connect-json-standalone.properties /etc/kafka-connect-jdbc/view.properties
Run Code Online (Sandbox Code Playgroud)

connect-json-standalone.properties 文件的内容是:

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets
Run Code Online (Sandbox Code Playgroud)

/etc/kafka-connect-jdbc/view.properties 的内容是:

name=view-small-jdbc-daily
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:teradata://domain.com/charset=UTF8,DBS_PORT=1025,DATABASE=test,USER=***,PASSWORD=***,LOB_SUPPORT=OFF
mode=bulk
table.whitelist=test_table
topic.prefix=view5-
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

0
推荐指数
1
解决办法
4058
查看次数

如何在 Kafka-Connect API 中设置 max.poll.records

我正在使用 confluent-3.0.1 平台并构建一个 Kafka-Elasticsearch 连接器。为此,我扩展了 SinkConnector 和 SinkTask(Kafka 连接 API)以从 Kafka 获取数据。

作为此代码的一部分,我正在扩展 SinkConnector 的 taskConfigs 方法以返回“max.poll.records”以一次仅获取 100 条记录。但它不起作用,我同时获取所有记录,但我没有在规定的时间内提交偏移量。请任何人帮我配置“max.poll.records”

 public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
    for (int i = 0; i < maxTasks; i++) {
      Map<String, String> config = new HashMap<String, String>();
      config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
      config.put(ConfigurationConstants.HOSTS, hosts);
      config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
      config.put(ConfigurationConstants.IDS, elasticSearchIds);
      config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
      config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
      config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
      config.put("max.poll.records", "100");

      configs.add(config);
    }
    return configs;
  }
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

0
推荐指数
1
解决办法
6005
查看次数

融合Kafka Connect Elasticsearch文档ID创建

我正在使用汇合来连接我的数据库和ES,但出现以下异常:

org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id.
    at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:75)
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:84)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:210)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我在卡夫卡连接-JDBC配置是:

name=task-view-list-stage
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10 
connection.url=jdbc:postgresql://localhost:5432/postgres?user=postgres&password=test
table.types=TABLE
query=select * from employee_master
mode=timestamp+incrementing
incrementing.column.name=employee_master_id
timestamp.column.name=modified_date
validate.non.null=false
topic.prefix=my-id-app
Run Code Online (Sandbox Code Playgroud)

我的kafka-connect Elasticsearch配置是:

name=es-id-view
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=my-id-app
topics.key.ignore=false
transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=employee_master_id
connection.url=http://localhost:9200
type.name=type_id
Run Code Online (Sandbox Code Playgroud)

我的表结构是:

employee_master_id | emp_name | modified_date
-----------------------------------------------------------
1                  |  Bala    |  "2017-05-18 …
Run Code Online (Sandbox Code Playgroud)

jdbc elasticsearch apache-kafka confluent apache-kafka-connect

0
推荐指数
1
解决办法
962
查看次数

在Kafka Connect中使用自定义转换器吗?

我正在尝试将自定义转换器与Kafka Connect一起使用,但似乎无法正确使用。我希望有人对此有经验,可以帮助我解决!

初始情况

怎么了 ?

当连接器启动时,它们会正确加载罐子并找到自定义转换器。确实,这是我在日志中看到的内容:

[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
Run Code Online (Sandbox Code Playgroud)

然后,我将JSON配置发布到连接器节点之一以创建我的连接器:

{
  "name": "hdfsSinkCustom",
  "config": {
    "topics": "yellow",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "custom.CustomStringConverter",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
    "topics.dir": "yellow_storage",
    "flush.size": "1",
    "rotate.interval.ms": "1000"
  }
}
Run Code Online (Sandbox Code Playgroud)

并收到以下回复:

{
   "error_code": 400,
   "message": "Connector configuration is invalid and contains …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

0
推荐指数
1
解决办法
2666
查看次数

Kafka Connect无法使用主题策略

语境

我编写了几个小型的Kafka Connect连接器。一个每秒仅生成随机数据,另一个每秒将其记录在控制台中。它们与Schema Registry集成在一起,因此可以使用Avro序列化数据。

我使用Landoop提供fast-data-dev Docker映像将它们部署到本地Kafka环境中

基本设置有效,并每秒产生一条记录的消息

但是,我想更改主题名称策略。默认一生成两个主题:

  • ${topic}-key
  • ${topic}-value

根据我的用例,我将需要生成具有不同模式的事件,这些事件最终会出现在同一主题上。因此,我需要的主题名称是:

  • ${topic}-${keyRecordName}
  • ${topic}-${valueRecordName}

根据文档,我的需求适合TopicRecordNameStrategy

我尝试了什么

我创建avroData用于发送值进行连接的对象:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }
Run Code Online (Sandbox Code Playgroud)

然后用它来创建SourceRecord响应对象

该文档指出,为了在Kafka Connect中使用架构注册表,我必须在连接器配置中设置一些属性。因此,当我创建它时,将它们添加:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Run Code Online (Sandbox Code Playgroud)

问题

连接器似乎忽略了这些属性,并继续使用旧的${topic}-key${topic}-value主题。

Kafka Connect应该支持不同的主题策略。我设法通过编写自己的版本AvroConverter和硬编码来解决此问题,该主题策略是我所需要的。但是,这似乎不是一种好方法,并且在尝试使用Sink …

avro apache-kafka apache-kafka-connect confluent-schema-registry

0
推荐指数
1
解决办法
658
查看次数