标签: apache-kafka-connect

Kafka-Connect-Hdfs - 无法启动 HdfsSinkConnector

我已经从http://docs.confluent.io/2.0.0/quickstart.html#quickstart下载了 kafka 连接

我正在尝试运行 hdfs 连接器。以下是设置:

连接standalone.properties:

bootstrap.servers=lvpi00658.s:9092,lvpi00659.s:9092,lvpi00660.s:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Run Code Online (Sandbox Code Playgroud)

快速入门-hdfs.properties:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=eightball-stuff11
hdfs.url=hdfs://localhost:9000
flush.size=3
Run Code Online (Sandbox Code Playgroud)

我像这样运行 hdfs 连接器:
cd /home/fclvappi005561/confluent-3.0.0/bin
./connect-standalone ../etc/kafka-connect-hdfs/connect-standalone.properties ../etc/kafka-connect-hdfs/quickstart-hdfs.properties

但我收到一个错误:

[2016-09-12 17:19:28,039] 信息无法启动 HdfsSinkConnector: (io.confluent.connect.hdfs.HdfsSinkTask:72) org.apache.kafka.connect.errors.ConnectException: org.apache.hadoop。 security.AccessControlException: 权限被拒绝: user=lvpi005561, access=WRITE, inode="/topics":root:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker. java:319) 在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292) 在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213)在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190) 在 org.apache.hadoop.hdfs.server.namenode.FSDirectory。checkPermission(FSDirectory.java:1698) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1682) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory. java:1665) 在 org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:71) 在 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3900)在 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:978) 在 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.javaorg.apache.hadoop) …

hdfs apache-kafka apache-kafka-connect

1
推荐指数
1
解决办法
1752
查看次数

使用kafka接收器重命名elasticsearch中的索引

我正在使用以下水槽。问题是它将elasticsearch索引名称设置为与主题相同。我想要一个不同的 elasticseach 索引名称。我怎样才能做到这一点。我正在使用汇合4

{
  "name": "es-sink-mysql-foobar-02",
  "config": {
    "_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.schema.registry.url": "http://localhost:8081",


    "_comment": "--- Elasticsearch-specific config ---",
    "_comment": "Elasticsearch server address",
    "connection.url": "http://localhost:9200",

    "_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist  ",
    "type.name": "type.name=kafka-connect",
    "index.name": "asimtest",
    "_comment": "Which topic to stream data from into Elasticsearch",
    "topics": "mysql-foobar",

    "_comment": "If the Kafka message doesn't have a key …
Run Code Online (Sandbox Code Playgroud)

elasticsearch apache-kafka kafka-consumer-api apache-kafka-connect confluent-platform

1
推荐指数
1
解决办法
3385
查看次数

kafka 中不同主题的多个连接器将连接到同一节点

我在 kafka-connect 中创建了两个 kafka 连接器,它们使用相同的连接器类,但它们监听不同的主题。

当我在节点上启动该进程时,两个连接器最终都会在此进程上创建任务。但是,我希望一个节点仅处理一个连接器/主题。如何将主题/连接器限制为单个节点?我在 connect-distributed.properties 中没有看到任何配置,其中进程可以指定要使用哪个连接器。

谢谢

apache-kafka apache-kafka-connect

1
推荐指数
1
解决办法
7182
查看次数

运行kafka连接分布式模式时地址已被使用

我通过发出“./bin/confluent start”命令启动了融合套件。然后我使用 kafka connect 将 kafka 数据汇入 mysql。

通过执行以下命令,我可以在独立模式下很好地运行 kafka connect:

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/adstats-jdbc-sink.properties

然后我关闭上面的命令并通过命令切换到分布式模式:

./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties ./etc/kafka-connect-jdbc/adstats-jdbc-sink.properties

它报告了以下异常:

[2018-08-09 14:51:56,951] 错误无法启动连接 (org.apache.kafka.connect.cli.ConnectDistributed:108) org.apache.kafka.connect.errors.ConnectException: 无法启动 REST 服务器org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:214) 在 org.apache.kafka.connect.runtime.Connect.start(Connect.java:53) 在 org.apache.kafka。 connect.cli.ConnectDistributed.main(ConnectDistributed.java:106) 由:java.net.BindException:地址已在 sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net 使用。绑定(Net.java:433)在 sun.nio.ch.Net.bind(Net.java:425)

怎么了?如何切换到使用 kafka connect 分布式模式?谢谢!

distributed apache-kafka-connect

1
推荐指数
1
解决办法
1614
查看次数

kafka-connect-jdbc 不从源获取连续的时间戳

我使用 kafka-connect-jdbc-4.0.0.jar 和 postgresql-9.4-1206-jdbc41.jar

kafka connect的connector配置

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "timestamp",
  "timestamp.column.name": "updated_at",
  "topic.prefix": "streaming.data.v2",
  "connection.password": "password",
  "connection.user": "user",
  "schema.pattern": "test",
  "query": "select * from view_source",
  "connection.url": "jdbc:postgresql://host:5432/test?currentSchema=test"
}
Run Code Online (Sandbox Code Playgroud)

我已经使用 jdbc 驱动程序配置了两个连接器一个源和另一个接收器,针对 postgresql 数据库(“PostgreSQL 9.6.9”)一切正常

我对连接器如何收集源数据有疑问,查看日志我看到执行查询之间存在 21 秒的时间差

11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > …
Run Code Online (Sandbox Code Playgroud)

postgresql jdbc apache-kafka apache-kafka-connect confluent-platform

1
推荐指数
1
解决办法
1550
查看次数

Apache Kafka JDBC 连接器 - SerializationException:未知魔术字节

我们正在尝试使用 Confluent JDBC Sink 连接器将主题中的值写回 postgres 数据库。

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=xxx
tasks.max=1
topics=topic_name
auto.evolve=true
connection.user=confluent_rw
auto.create=true
connection.url=jdbc:postgresql://x.x.x.x:5432/Datawarehouse
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
Run Code Online (Sandbox Code Playgroud)

我们可以使用以下命令读取控制台中的值:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic topic_name
Run Code Online (Sandbox Code Playgroud)

模式存在并且值被正确反序列化,kafka-avro-console-consumer因为它没有给出错误但连接器给出了这些错误:

  {
  "name": "datawarehouse_sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "x.x.x.x:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "x.x.x.x:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: f_machinestate_sink\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error …
Run Code Online (Sandbox Code Playgroud)

jdbc avro apache-kafka apache-kafka-connect

1
推荐指数
1
解决办法
1809
查看次数

Debezium 无法对大表大小进行快照

我想我可能错过了一些配置,但我们正在尝试使用 Debezium 对一个包含大约 800 万条记录的表中的所有行进行快照,一段时间后它会停止。

连接器配置是:

{
   "connector.class":"io.debezium.connector.mysql.MySqlConnector",
   "database.user":"MyUser",
   "database.server.id":"12345",
   "tasks.max":"1",
   "database.history.kafka.bootstrap.servers":"MyKafka:9092",
   "database.history.kafka.topic":"MyConnectorHistory",
   "database.server.name":"MyDbName",
   "database.port":"3306",
   "table.whitelist":"BigTable",
   "decimal.handling.mode":"double",
   "database.hostname":"***",
   "database.password":"***",
   "name":"MyConnector",
   "database.whitelist":"MyDb",
   "snapshot.mode":"initial_only",
   "connect.timeout.ms":"60000"
}

Run Code Online (Sandbox Code Playgroud)

连接器开始扫描行:

  April 24th 2019, 13:06:52.573 2019-04-24 16:06:52,569 INFO   MySQL|MyDbName|snapshot  Step 9: - 2040000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:59:29.129   [io.debezium.connector.mysql.SnapshotReader]
  ... other prints
  April 24th 2019, 12:17:28.448 2019-04-24 15:17:28,447 INFO   MySQL|MyDbName|snapshot  Step 9: - 50000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:10:05.008   [io.debezium.connector.mysql.SnapshotReader]
    April 24th 2019, 12:07:43.183   2019-04-24 15:07:43,183 INFO   MySQL|MyDbName|snapshot …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect debezium

1
推荐指数
1
解决办法
2661
查看次数

有没有办法配置要使用 jmx_exporter/prometheus 捕获的 kafka-connect jmx 指标?

我正在我们的 Kafka 生态系统中为 Kafka 连接设置监控。我已经为 kafka 代理启用了 JMX 导出器并且工作正常。现在我正在尝试为 kafka 连接启用 JMX 导出器。但是,从哪里开始有点不清楚。

我只能修改connect-distributed.sh以启用更改。任何指针都会是一个很好的补充。

kafka-run-class.sh已修改为能够jmx_exporter在上发出 jmx 指标http://<host>:9304/metrics

我想到卡夫卡连发出的指标http://<host>:19000/metrics一旦jmx_exporter被启用。

apache-kafka prometheus apache-kafka-connect jmx-exporter

1
推荐指数
1
解决办法
3067
查看次数

Cannot deserialize instance of `java.lang.String` in node.js server to kafka connect connection

I use curl to submit to our Kafka Connect service a JSON request message with information about the connector, it is working successfully.

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ \"name\": \"inventory-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\", \"tasks.max\": \"1\", \"database.hostname\": \"mysql\", \"database.port\": \"3306\", \"database.user\": \"debezium\", \"database.password\": \"dbz\", \"database.server.id\": \"184054\", \"database.server.name\": \"dbserver1\", \"database.whitelist\": \"inventory\", \"database.history.kafka.bootstrap.servers\": \"kafka:9092\", \"database.history.kafka.topic\": \"dbhistory.inventory\" } }'
Run Code Online (Sandbox Code Playgroud)

now I am using node.js server to send data to kafka connect server.

  var body = { …
Run Code Online (Sandbox Code Playgroud)

java node.js apache-kafka-connect

1
推荐指数
1
解决办法
938
查看次数

Kafka-confluent:如何在 JDBC 接收器连接器中使用 pk.mode=record_key 进行更新插入和删除模式?

在 Kafka confluent 中,我们如何在使用pk.mode=record_keyMySQL 表中的复合键的同时使用源作为 CSV 文件的 upsert ?使用pk.mode=record_values. 是否有任何额外的配置需要完成?

如果我尝试使用pk.mode=record_key. 错误 - 由以下原因引起org.apache.kafka.connect.errors.ConnectException::需要定义一个 PK 列,因为记录的关键架构是一种原始类型。以下是我的 JDBC 接收器连接器配置:

    {
    "name": "<name>",
    "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "<topic name>",
    "connection.url": "<url>",
    "connection.user": "<user name>",
    "connection.password": "*******",
    "insert.mode": "upsert",
    "batch.size": "50000",
    "table.name.format": "<table name>",
    "pk.mode": "record_key",
    "pk.fields": "field1,field2",
    "auto.create": "true",
    "auto.evolve": "true",
    "max.retries": "10",
    "retry.backoff.ms": "3000",
    "mode": "bulk",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://localhost:8081"
  }
}
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect confluent-platform

1
推荐指数
1
解决办法
2793
查看次数