我正在分布式模式下运行 CP3.2,甚至使用“tasks.max”定义的一些连接器:“1”也具有任务“UNASSIGNED”状态。我增加了分配给工作人员的内存并重新启动工作人员解决了我的问题,或者添加一个工作人员解决了这个问题。
如果“tasks.max”> 1 有一些任务处于“未分配”状态,这对我来说没问题,但如果我只定义一个任务,它应该处于“运行”状态。
但我需要了解任务在什么情况下会进入“未分配”状态以及如何解决此问题(使其运行)。
问候,
阿拉迪亚
我刚刚在两个实例的集群(2 台机器上的 2 个容器)上部署了我的 Kafka Connect(我只使用连接源到 MQTT)应用程序,现在它似乎进入了一种重新平衡循环,我有一个开始时有一点数据,但没有出现新数据。这是我在日志中得到的。
[2017-08-11 07:27:35,810] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-592bcc91-9d99-4c54-b707-3f52d0f8af50', leaderUrl='http:// 10.120.233.78:9040/', offset=2, connectorIds=[SourceConnector1], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1009)
[2017-08-11 07:27:35,810] WARN Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:679)
[2017-08-11 07:27:35,810] INFO Current config state offset 1 is behind group assignment 2, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder:723)
[2017-08-11 07:27:36,310] INFO Finished reading to end of log and updated config snapshot, new config log offset: 1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:727)
[2017-08-11 07:27:36,310] INFO Current config state offset 1 …Run Code Online (Sandbox Code Playgroud) 我试图弄清楚是否可以使用 Kafka Connect 将存储为字符串的 JSON 值转换为实际的 JSON 结构。
我尝试寻找这样的转变,但没有找到。例如,这可能是来源:
{
"UserID":2105058535,
"DocumentID":2105058535,
"RandomJSON":"{\"Tags\":[{\"TagID\":1,\"TagName\":\"Java\"},{\"TagID\":2,\"TagName\":\"Kafka\"}]}"
}
Run Code Online (Sandbox Code Playgroud)
这就是我的目标:
{
"UserID":2105058535,
"DocumentID":2105058535,
"RandomJSON":{
"Tags":[
{
"TagID":1,
"TagName":"Java"
},
{
"TagID":2,
"TagName":"Kafka"
}
]
}
}
Run Code Online (Sandbox Code Playgroud)
我正在尝试对 Elasticsearch 接收器连接器进行这些转换(如果有影响的话)。
我知道我可以将 Logstash 与 JSON 过滤器一起使用来做到这一点,但我想知道是否有一种方法可以仅使用 Kafka Connect 来做到这一点。
我正在使用 Kafka 连接 HDFS。当我尝试运行连接器时,出现以下异常:
错误无法创建 WAL 编写器:无法为客户端 [IP] 的 [DFSClient_NONMAPREDUCE_208312334_41] 创建文件 [/path/log],因为该文件已由 [DFSClient_NONMAPREDUCE_165323242_41] 创建
请问有什么建议吗?
有人可以解释一下下面配置中的partition.duration.ms和flushsize的意义吗?设置这些属性背后的想法应该是什么?
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "eu-central-1",
"partition.duration.ms": "1000",
"topics.dir": "root_bucket",
"flush.size": "10",
"topics": "TEST_SRV",
"tasks.max": "1",
"s3.part.size": "5242880",
"timezone": "UTC",
"locale": "US",
"key.converter.schemas.enable": "true",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "events-dev-s3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"path.format": "'year'-YYYY/'month'-MM/'day'-dd/'hour'-HH",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_data.created_at"
Run Code Online (Sandbox Code Playgroud) 我正在运行一个数据管道,以便通过 jdbc connect 将数据从 sql db 读取到 kafka 主题中我使用 ES 的 kafka 接收器连接器将这些数据接收到 Elasticsearch 中
我需要重置这条管道。为此,我想重置正在侦听 jdbc 连接主题的消费者组。所以我运行以下命令
kafka-consumer-groups --reset-offsets --to-earliest --all-topics --execute --group mygroup --bootstrap-server myserver:9092
Run Code Online (Sandbox Code Playgroud)
但我收到这个错误
Error: Assignments can only be reset if the group 'mygroup' is inactive, but the current state is Stable.
Run Code Online (Sandbox Code Playgroud)
我停止了连接以使该组处于非活动状态,但这不起作用。所以我的问题是,如何让我的小组不活跃
我正在使用融合的 kafka 连接服务,但它没有写入日志/var/log/kafka。如何配置它以便它写入日志/var/log/kafka?
目前 /var/log/kafka 只有以下日志文件 -
-rw-r--r-- 1 cp-kafka confluent 0 Sep 20 14:51 kafka-request.log
-rw-r--r-- 1 cp-kafka confluent 0 Sep 20 14:51 kafka-authorizer.log
-rw-r--r-- 1 cp-kafka confluent 1622 Nov 13 15:43 log-cleaner.log
-rw-r--r-- 1 cp-kafka confluent 7611 Nov 13 20:57 state-change.log
-rw-r--r-- 1 cp-kafka confluent 1227 Nov 14 11:13 server.log
-rw-r--r-- 1 cp-kafka confluent 16683 Nov 14 11:13 controller.log
Run Code Online (Sandbox Code Playgroud)
当进一步检查时,我发现日志写入/var/log/messages(我不想要)。看看下面connect-log4j.properties:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR …Run Code Online (Sandbox Code Playgroud) 我有一个 Kafka Connect 接收器正在运行。我想监控这个延迟。
我可以通过 shell 进入代理并使用kafka-consumer-groups如下工具来手动获取延迟:
unset JMX_PORT; /usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --group connect-<my-kafka-connect-connector> --describe
Run Code Online (Sandbox Code Playgroud)
这会给我类似的东西:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-<my-kafka-connect-connector> <my-topic> 0 1414248272 2775658553 1361410281 connector-consumer-<my-kafka-connect-connector>-<uuid> /<my-host-ip> connector-consumer-<my-kafka-connect-connector>-0
Run Code Online (Sandbox Code Playgroud)
这就是我想要的滞后信息,但我希望将其放在 Prometheus 指标中,我可以将其放在仪表板上并进行监控和设置警报。
我正在获取 Kafka Broker 指标和 Kafka Connect 指标,这两个指标似乎都没有此信息。curl我已经用和遍历了 Prometheus 指标输出,grep但不存在此信息。
我通过官方 Confluence Helm 图表(https://github.com/confluenceinc/cp-helm-charts/tree/master/charts/cp-kafka-connect)运行 Kafka Connect,并使用默认的 Prometheus 指标导出。这有效,我可以获得基本指标,但没有有关滞后的信息:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-<my-kafka-connect-connector> <my-topic> 0 1414248272 2775658553 1361410281 connector-consumer-<my-kafka-connect-connector>-<uuid> /<my-host-ip> connector-consumer-<my-kafka-connect-connector>-0 …Run Code Online (Sandbox Code Playgroud) 我正在尝试在docker的帮助下使用kafka debezium(Kafka流)将一个数据库的表数据下沉到另一个数据库。数据库流工作正常。但流式数据接收另一个 MySQL DB 进程时出现错误。
对于我的连接器接收器配置如下。
{
"name": "mysql_sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "mysql-connect.kafka_test.employee",
"connection.url": "jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value",
"errors.tolerance": "all",
"errors.log.enable":"true",
"errors.log.include.messages":"true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"name": "mysql_sink"
}
}
Run Code Online (Sandbox Code Playgroud)
但我收到错误。
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/kafka_test_1&user=debezium&password=xxxxx
io.confluent.connect.jdbc.util.CachedConnectionProvider.getValidConnection(CachedConnectionProvider.java:59)
io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:52)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 more\nCaused by: java.sql.SQLException: No …Run Code Online (Sandbox Code Playgroud) 我成功安装了Postgres Debezium CDC。现在,我能够捕获数据库发生的所有更改。但问题是“之前”字段始终为空。因此,如果我插入一条记录,(id = 1, name = Bill)我就会从 Kafka 获取以下数据:
'payload': {'before': None, 'after': {'id': 1, 'name': 'Bill'}, ...
Run Code Online (Sandbox Code Playgroud)
但如果我像这样更新记录:
UPDATE mytable set name = 'Bob' WHERE id = 1
Run Code Online (Sandbox Code Playgroud)
我从卡夫卡那里得到这个:
'payload': {'before': None, 'after': {'id': 1, 'name': 'Bob'}, ...
Run Code Online (Sandbox Code Playgroud)
这就是我配置连接器的方式:
'payload': {'before': None, 'after': {'id': 1, 'name': 'Bill'}, ...
Run Code Online (Sandbox Code Playgroud)
这是什么问题?我该如何解决?