标签: apache-kafka-connect

连接器中的任务何时转为未分配状态?

我正在分布式模式下运行 CP3.2,甚至使用“tasks.max”定义的一些连接器:“1”也具有任务“UNASSIGNED”状态。我增加了分配给工作人员的内存并重新启动工作人员解决了我的问题,或者添加一个工作人员解决了这个问题。

如果“tasks.max”> 1 有一些任务处于“未分配”状态,这对我来说没问题,但如果我只定义一个任务,它应该处于“运行”状态。

但我需要了解任务在什么情况下会进入“未分配”状态以及如何解决此问题(使其运行)。

问候,

阿拉迪亚

apache-kafka apache-kafka-connect

6
推荐指数
1
解决办法
1万
查看次数

Kafka Connect 进入重新平衡循环

我刚刚在两个实例的集群(2 台机器上的 2 个容器)上部署了我的 Kafka Connect(我只使用连接源到 MQTT)应用程序,现在它似乎进入了一种重新平衡循环,我有一个开始时有一点数据,但没有出现新数据。这是我在日志中得到的。

[2017-08-11 07:27:35,810] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-592bcc91-9d99-4c54-b707-3f52d0f8af50', leaderUrl='http:// 10.120.233.78:9040/', offset=2, connectorIds=[SourceConnector1], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1009)
[2017-08-11 07:27:35,810] WARN Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:679)
[2017-08-11 07:27:35,810] INFO Current config state offset 1 is behind group assignment 2, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder:723)
[2017-08-11 07:27:36,310] INFO Finished reading to end of log and updated config snapshot, new config log offset: 1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:727)
[2017-08-11 07:27:36,310] INFO Current config state offset 1 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api apache-kafka-connect

6
推荐指数
1
解决办法
1283
查看次数

Kafka Connect 将 JSON 字符串转换为实际 JSON

我试图弄清楚是否可以使用 Kafka Connect 将存储为字符串的 JSON 值转换为实际的 JSON 结构。

我尝试寻找这样的转变,但没有找到。例如,这可能是来源:

{
  "UserID":2105058535,
  "DocumentID":2105058535,
  "RandomJSON":"{\"Tags\":[{\"TagID\":1,\"TagName\":\"Java\"},{\"TagID\":2,\"TagName\":\"Kafka\"}]}"
}
Run Code Online (Sandbox Code Playgroud)

这就是我的目标:

{
  "UserID":2105058535,
  "DocumentID":2105058535,
  "RandomJSON":{
    "Tags":[
      {
        "TagID":1,
        "TagName":"Java"
      },
      {
        "TagID":2,
        "TagName":"Kafka"
      }
    ]
  }
}
Run Code Online (Sandbox Code Playgroud)

我正在尝试对 Elasticsearch 接收器连接器进行这些转换(如果有影响的话)。

我知道我可以将 Logstash 与 JSON 过滤器一起使用来做到这一点,但我想知道是否有一种方法可以仅使用 Kafka Connect 来做到这一点。

json apache-kafka-connect

6
推荐指数
1
解决办法
5331
查看次数

Kafka连接HDFS接收器错误无法创建WAL

我正在使用 Kafka 连接 HDFS。当我尝试运行连接器时,出现以下异常:

错误无法创建 WAL 编写器:无法为客户端 [IP] 的 [DFSClient_NONMAPREDUCE_208312334_41] 创建文件 [/path/log],因为该文件已由 [DFSClient_NONMAPREDUCE_165323242_41] 创建

请问有什么建议吗?

hdfs apache-kafka apache-kafka-connect confluent-platform

6
推荐指数
1
解决办法
838
查看次数

Kafka connect属性partition.duration.ms和flush size之间的关系?

有人可以解释一下下面配置中的partition.duration.ms和flushsize的意义吗?设置这些属性背后的想法应该是什么?

"connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "s3.region": "eu-central-1",
  "partition.duration.ms": "1000",
  "topics.dir": "root_bucket",
  "flush.size": "10",
  "topics": "TEST_SRV",
  "tasks.max": "1",
  "s3.part.size": "5242880",
  "timezone": "UTC",
  "locale": "US",
  "key.converter.schemas.enable": "true",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
  "value.converter.schemas.enable": "false",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "s3.bucket.name": "events-dev-s3",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "path.format": "'year'-YYYY/'month'-MM/'day'-dd/'hour'-HH",
  "timestamp.extractor": "RecordField",
  "timestamp.field": "event_data.created_at"
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

6
推荐指数
1
解决办法
3546
查看次数

Kafka:使消费者组处于非活动状态

我正在运行一个数据管道,以便通过 jdbc connect 将数据从 sql db 读取到 kafka 主题中我使用 ES 的 kafka 接收器连接器将这些数据接收到 Elasticsearch 中

我需要重置这条管道。为此,我想重置正在侦听 jdbc 连接主题的消费者组。所以我运行以下命令

kafka-consumer-groups --reset-offsets --to-earliest --all-topics --execute --group mygroup --bootstrap-server myserver:9092
Run Code Online (Sandbox Code Playgroud)

但我收到这个错误

Error: Assignments can only be reset if the group 'mygroup' is inactive, but the current state is Stable.
Run Code Online (Sandbox Code Playgroud)

我停止了连接以使该组处于非活动状态,但这不起作用。所以我的问题是,如何让我的小组不活跃

apache-kafka kafka-consumer-api apache-kafka-connect

6
推荐指数
1
解决办法
1万
查看次数

如何配置Confluence Platform Kafka连接日志?

我正在使用融合的 kafka 连接服务,但它没有写入日志/var/log/kafka。如何配置它以便它写入日志/var/log/kafka

目前 /var/log/kafka 只有以下日志文​​件 -

-rw-r--r-- 1 cp-kafka confluent     0 Sep 20 14:51 kafka-request.log
-rw-r--r-- 1 cp-kafka confluent     0 Sep 20 14:51 kafka-authorizer.log
-rw-r--r-- 1 cp-kafka confluent  1622 Nov 13 15:43 log-cleaner.log
-rw-r--r-- 1 cp-kafka confluent  7611 Nov 13 20:57 state-change.log
-rw-r--r-- 1 cp-kafka confluent  1227 Nov 14 11:13 server.log
-rw-r--r-- 1 cp-kafka confluent 16683 Nov 14 11:13 controller.log
Run Code Online (Sandbox Code Playgroud)

当进一步检查时,我发现日志写入/var/log/messages(我不想要)。看看下面connect-log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect confluent-platform

6
推荐指数
1
解决办法
5951
查看次数

Kafka Connect 消费者组滞后指标?

我有一个 Kafka Connect 接收器正在运行。我想监控这个延迟。

我可以通过 shell 进入代理并使用kafka-consumer-groups如下工具来手动获取延迟:

unset JMX_PORT; /usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --group connect-<my-kafka-connect-connector> --describe
Run Code Online (Sandbox Code Playgroud)

这会给我类似的东西:

GROUP                                 TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                             HOST            CLIENT-ID
connect-<my-kafka-connect-connector>  <my-topic>            0          1414248272      2775658553      1361410281      connector-consumer-<my-kafka-connect-connector>-<uuid>                                  /<my-host-ip>   connector-consumer-<my-kafka-connect-connector>-0
Run Code Online (Sandbox Code Playgroud)

这就是我想要的滞后信息,但我希望将其放在 Prometheus 指标中,我可以将其放在仪表板上并进行监控和设置警报。

我正在获取 Kafka Broker 指标和 Kafka Connect 指标,这两个指标似乎都没有此信息。curl我已经用和遍历了 Prometheus 指标输出,grep但不存在此信息。

我通过官方 Confluence Helm 图表(https://github.com/confluenceinc/cp-helm-charts/tree/master/charts/cp-kafka-connect)运行 Kafka Connect,并使用默认的 Prometheus 指标导出。这有效,我可以获得基本指标,但没有有关滞后的信息:

GROUP                                 TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                             HOST            CLIENT-ID
connect-<my-kafka-connect-connector>  <my-topic>            0          1414248272      2775658553      1361410281      connector-consumer-<my-kafka-connect-connector>-<uuid>                                  /<my-host-ip>   connector-consumer-<my-kafka-connect-connector>-0 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

6
推荐指数
0
解决办法
1804
查看次数

Kafka Connect JDBC Sink 连接器 - java.sql.SQLException:找不到合适的驱动程序

我正在尝试在docker的帮助下使用kafka debezium(Kafka流)将一个数据库的表数据下沉到另一个数据库。数据库流工作正常。但流式数据接收另一个 MySQL DB 进程时出现错误。

对于我的连接器接收器配置如下。

 {
  "name": "mysql_sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "mysql-connect.kafka_test.employee",
    "connection.url": "jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value",
    "errors.tolerance": "all",
    "errors.log.enable":"true",
    "errors.log.include.messages":"true",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "name": "mysql_sink"
  }
}
Run Code Online (Sandbox Code Playgroud)

但我收到错误。

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx
io.confluent.connect.jdbc.util.CachedConnectionProvider.getValidConnection(CachedConnectionProvider.java:59)
io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:52)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 more\nCaused by: java.sql.SQLException: No …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect confluent-platform

6
推荐指数
1
解决办法
4171
查看次数

Postgres Debezium 不发布记录的先前状态

我成功安装了Postgres Debezium CDC。现在,我能够捕获数据库发生的所有更改。但问题是“之前”字段始终为空。因此,如果我插入一条记录,(id = 1, name = Bill)我就会从 Kafka 获取以下数据:

'payload': {'before': None, 'after': {'id': 1, 'name': 'Bill'}, ...
Run Code Online (Sandbox Code Playgroud)

但如果我像这样更新记录:

UPDATE mytable set name = 'Bob' WHERE id = 1
Run Code Online (Sandbox Code Playgroud)

我从卡夫卡那里得到这个:

'payload': {'before': None, 'after': {'id': 1, 'name': 'Bob'}, ...
Run Code Online (Sandbox Code Playgroud)

这就是我配置连接器的方式:

'payload': {'before': None, 'after': {'id': 1, 'name': 'Bill'}, ...
Run Code Online (Sandbox Code Playgroud)

这是什么问题?我该如何解决?

postgresql apache-kafka-connect debezium

6
推荐指数
1
解决办法
5193
查看次数