标签: apache-kafka-connect

Kafka-Connect vs Filebeat和Logstash

我希望从Kafka消费数据并将数据保存到Hadoop和Elasticsearch中。我目前已经看到了两种方法:使用Filebeat从Kafka消费并将其发送到ES,以及使用Kafka-Connect框架。有一个Kafka-Connect-HDFS和Kafka-Connect-Elasticsearch模块。

我不确定要使用哪个发送流数据。虽然我认为如果我想在某个时候从Kafka中获取数据并将其放入Cassandra中,我可以为此使用Kafka-Connect模块,但是Filebeat没有这样的功能。

elasticsearch filebeat apache-kafka-connect

6
推荐指数
1
解决办法
4027
查看次数

重置JDBC Kafka Connector以便从时间开始开始拉行吗?

Kafka连接器可以利用主键和时间戳来确定需要处理的行。

我正在寻找一种重置连接器的方法,以便从时间开始进行处理。

sql-server apache-kafka apache-kafka-connect

6
推荐指数
1
解决办法
1710
查看次数

从 Kafka Connect 到 S3 的 Parquet 输出

我看到 Kafka Connect 可以以 Avro 或 JSON 格式写入 S3。但是没有 Parquet 支持。添加这个会有多难?

apache-kafka parquet apache-kafka-connect

6
推荐指数
1
解决办法
3877
查看次数

即使 json 数据包含架构和有效负载字段,kafka 连接 hdfs 接收器连接器也失败

我正在尝试使用 kafka 连接 hdfs 接收器连接器将 json 数据从 kafka 移动到 hdfs。

即使 kafka 中的 json 数据具有架构和有效负载 kafka 连接任务也失败并出现错误

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.
Run Code Online (Sandbox Code Playgroud)

卡夫卡中的数据:

./bin/kafka-console-consumer --topic test_hdfs_json_schema_payload_1 --zookeeper localhost:2181 --from-beginning

{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "deepak","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "sufi","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": …
Run Code Online (Sandbox Code Playgroud)

hdfs apache-kafka apache-kafka-connect

6
推荐指数
1
解决办法
5280
查看次数

Kafka Connect 进入重新平衡循环

我刚刚在两个实例的集群(2 台机器上的 2 个容器)上部署了我的 Kafka Connect(我只使用连接源到 MQTT)应用程序,现在它似乎进入了一种重新平衡循环,我有一个开始时有一点数据,但没有出现新数据。这是我在日志中得到的。

[2017-08-11 07:27:35,810] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-592bcc91-9d99-4c54-b707-3f52d0f8af50', leaderUrl='http:// 10.120.233.78:9040/', offset=2, connectorIds=[SourceConnector1], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1009)
[2017-08-11 07:27:35,810] WARN Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:679)
[2017-08-11 07:27:35,810] INFO Current config state offset 1 is behind group assignment 2, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder:723)
[2017-08-11 07:27:36,310] INFO Finished reading to end of log and updated config snapshot, new config log offset: 1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:727)
[2017-08-11 07:27:36,310] INFO Current config state offset 1 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api apache-kafka-connect

6
推荐指数
1
解决办法
1283
查看次数

How to transform and extract fields in Kafka sink JDBC connector

I am using a 3rd party CDC tool that replicates data from a source database into Kafka topics. An example row is shown below:

{  
   "data":{  
      "USER_ID":{  
         "string":"1"
      },
      "USER_CATEGORY":{  
         "string":"A"
      }
   },
   "beforeData":{  
      "Data":{  
         "USER_ID":{  
            "string":"1"
         },
         "USER_CATEGORY":{  
            "string":"B"
         }
      }
   },
   "headers":{  
      "operation":"UPDATE",
      "timestamp":"2018-05-03T13:53:43.000"
   }
}
Run Code Online (Sandbox Code Playgroud)

What configuration is needed in the sink file in order to extract all the (sub)fields under data and headers and ignore those under beforeData so that the target table in which the …

apache-kafka apache-kafka-connect

6
推荐指数
2
解决办法
6019
查看次数

将 jar 添加到 Confluent Docker 中的通用 Kafka Connect 类路径

我正在为 Kafka connect v4.1.1 使用融合的 docker,并想添加一个带有特定 log4j 附加程序的 jar。
通过连接器的类路径隔离,我不确定在融合的 docker 中将该 jar 放在何处,因为它是由父 kafka 连接本身而不是连接器使用的。

在此先感谢您的帮助!

apache-kafka docker apache-kafka-connect confluent-platform

6
推荐指数
1
解决办法
4322
查看次数

Kafka连接mysql自定义查询

我在 kafka connect 的帮助下完成了增量数据同步。现在我想通过自定义查询实现相同的目标。但我收到错误。

我的配置文件是

name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector 
tasks.max=1 
connection.url=jdbc:mysql://127.0.0.1:3306/demouser=root&password=root 
query=select name from students3 where marks = 10 
mode=timestamp table.whitelist=students3 
timestamp.column.name=timestamp 
topic.prefix=test-mysql-jdbc-
Run Code Online (Sandbox Code Playgroud)

并得到以下错误:

错误 WorkerConnector{id=mysql-whitelist-timestamp-source} 启动连接器时出错 (org.apache.kafka.connect.runtime.WorkerConnector:119) org.apache.kafka.connect.errors.ConnectException: 查询不能与全表复制设置。

apache-kafka apache-kafka-connect

6
推荐指数
1
解决办法
2242
查看次数

Kafka Connect:没有为连接器创建任务

我们正在使用 Debezium (MongoDB) 和 Confluent S3 连接器以分布式模式运行 Kafka Connect(Confluent Platform 5.4,即 Kafka 2.4)。通过 REST API 添加新连接器时,连接器创建为 RUNNING 状态,但不会为连接器创建任何任务。

暂停和恢复连接器无济于事。当我们停止所有工人然后再次启动它们时,任务被创建,一切都按预期运行。

该问题不是由连接器插件引起的,因为我们看到 Debezium 和 S3 连接器的行为相同。同样在调试日志中,我可以看到 Debezium 正确地从 Connector.taskConfigs() 方法返回任务配置。

有人可以告诉我该怎么做,我们可以在不重新启动工作人员的情况下添加连接器吗?谢谢。

配置详情

集群有 3 个节点,具有以下connect-distributed.properties

bootstrap.servers=kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092,kafka-broker-004:9092
group.id=tdp-QA-connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets-qa
offset.storage.replication.factor=3
offset.storage.partitions=5

config.storage.topic=connect-configs-qa
config.storage.replication.factor=3

status.storage.topic=connect-status-qa
status.storage.replication.factor=3
status.storage.partitions=3

offset.flush.interval.ms=10000

rest.host.name=tdp-QA-kafka-connect-001
rest.port=10083
rest.advertised.host.name=tdp-QA-kafka-connect-001
rest.advertised.port=10083

plugin.path=/opt/kafka-connect/plugins,/usr/share/java/

security.protocol=SSL
ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
ssl.truststore.password=<secret>
ssl.endpoint.identification.algorithm=
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
producer.ssl.truststore.password=<secret>
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
consumer.ssl.truststore.password=<secret>

max.request.size=20000000
max.partition.fetch.bytes=20000000
Run Code Online (Sandbox Code Playgroud)

连接器配置

Debezium 示例:

{
  "name": "qa-mongodb-comp-converter-task|1",
  "config": {
    "connector.class": …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect confluent-platform

6
推荐指数
1
解决办法
1653
查看次数

Kafka Producer 无法在没有 PK 的情况下验证记录并返回 InvalidRecordException

我的 kafka 制作人有错误。我使用 Debezium Kafka 连接器 V1.1.0 Final 和 Kafka 2.4.1 。对于带有 pk 的表,所有表都清楚地刷新,但不幸的是,对于没有 pk 的表,它给了我这个错误:

[2020-04-14 10:00:00,096] INFO   Exporting data from table 'public.table_0' (io.debezium.relational.RelationalSnapshotChangeEventSource:280)
[2020-04-14 10:00:00,097] INFO   For table 'public.table_0' using select statement: 'SELECT * FROM "public"."table_0"' (io.debezium.relational.RelationalSnapshotChangeEventSource:287)
[2020-04-14 10:00:00,519] INFO   Finished exporting 296 records for table 'public.table_0'; total duration '00:00:00.421' (io.debezium.relational.RelationalSnapshotChangeEventSource:330)
[2020-04-14 10:00:00,522] INFO Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:79)
[2020-04-14 10:00:00,523] INFO Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfo=source_info[server='postgres'db='xxx, lsn=38/C74913C0, txId=4511542, timestamp=2020-04-14T02:00:00.517Z, snapshot=FALSE, schema=public, table=table_0], partition={server=postgres}, lastSnapshotRecord=true]] (io.debezium.pipeline.ChangeEventSourceCoordinator:90) …
Run Code Online (Sandbox Code Playgroud)

postgresql apache-kafka apache-kafka-connect debezium

6
推荐指数
1
解决办法
3760
查看次数