我已经从http://docs.confluent.io/2.0.0/quickstart.html#quickstart下载了 kafka 连接
我正在尝试运行 hdfs 连接器。以下是设置:
连接standalone.properties:
bootstrap.servers=lvpi00658.s:9092,lvpi00659.s:9092,lvpi00660.s:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Run Code Online (Sandbox Code Playgroud)
和
快速入门-hdfs.properties:
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=eightball-stuff11
hdfs.url=hdfs://localhost:9000
flush.size=3
Run Code Online (Sandbox Code Playgroud)
我像这样运行 hdfs 连接器:
cd /home/fclvappi005561/confluent-3.0.0/bin
./connect-standalone ../etc/kafka-connect-hdfs/connect-standalone.properties ../etc/kafka-connect-hdfs/quickstart-hdfs.properties
但我收到一个错误:
[2016-09-12 17:19:28,039] 信息无法启动 HdfsSinkConnector: (io.confluent.connect.hdfs.HdfsSinkTask:72) org.apache.kafka.connect.errors.ConnectException: org.apache.hadoop。 security.AccessControlException: 权限被拒绝: user=lvpi005561, access=WRITE, inode="/topics":root:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker. java:319) 在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292) 在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213)在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190) 在 org.apache.hadoop.hdfs.server.namenode.FSDirectory。checkPermission(FSDirectory.java:1698) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1682) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory. java:1665) 在 org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:71) 在 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3900)在 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:978) 在 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.javaorg.apache.hadoop) …
我正在使用以下水槽。问题是它将elasticsearch索引名称设置为与主题相同。我想要一个不同的 elasticseach 索引名称。我怎样才能做到这一点。我正在使用汇合4
{
"name": "es-sink-mysql-foobar-02",
"config": {
"_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"_comment": "--- Elasticsearch-specific config ---",
"_comment": "Elasticsearch server address",
"connection.url": "http://localhost:9200",
"_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist ",
"type.name": "type.name=kafka-connect",
"index.name": "asimtest",
"_comment": "Which topic to stream data from into Elasticsearch",
"topics": "mysql-foobar",
"_comment": "If the Kafka message doesn't have a key …Run Code Online (Sandbox Code Playgroud) elasticsearch apache-kafka kafka-consumer-api apache-kafka-connect confluent-platform
我在 kafka-connect 中创建了两个 kafka 连接器,它们使用相同的连接器类,但它们监听不同的主题。
当我在节点上启动该进程时,两个连接器最终都会在此进程上创建任务。但是,我希望一个节点仅处理一个连接器/主题。如何将主题/连接器限制为单个节点?我在 connect-distributed.properties 中没有看到任何配置,其中进程可以指定要使用哪个连接器。
谢谢
我通过发出“./bin/confluent start”命令启动了融合套件。然后我使用 kafka connect 将 kafka 数据汇入 mysql。
通过执行以下命令,我可以在独立模式下很好地运行 kafka connect:
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/adstats-jdbc-sink.properties
然后我关闭上面的命令并通过命令切换到分布式模式:
./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties ./etc/kafka-connect-jdbc/adstats-jdbc-sink.properties
它报告了以下异常:
[2018-08-09 14:51:56,951] 错误无法启动连接 (org.apache.kafka.connect.cli.ConnectDistributed:108) org.apache.kafka.connect.errors.ConnectException: 无法启动 REST 服务器org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:214) 在 org.apache.kafka.connect.runtime.Connect.start(Connect.java:53) 在 org.apache.kafka。 connect.cli.ConnectDistributed.main(ConnectDistributed.java:106) 由:java.net.BindException:地址已在 sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net 使用。绑定(Net.java:433)在 sun.nio.ch.Net.bind(Net.java:425)
怎么了?如何切换到使用 kafka connect 分布式模式?谢谢!
我使用 kafka-connect-jdbc-4.0.0.jar 和 postgresql-9.4-1206-jdbc41.jar
kafka connect的connector配置
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp",
"timestamp.column.name": "updated_at",
"topic.prefix": "streaming.data.v2",
"connection.password": "password",
"connection.user": "user",
"schema.pattern": "test",
"query": "select * from view_source",
"connection.url": "jdbc:postgresql://host:5432/test?currentSchema=test"
}
Run Code Online (Sandbox Code Playgroud)
我已经使用 jdbc 驱动程序配置了两个连接器一个源和另一个接收器,针对 postgresql 数据库(“PostgreSQL 9.6.9”)一切正常
我对连接器如何收集源数据有疑问,查看日志我看到执行查询之间存在 21 秒的时间差
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > …Run Code Online (Sandbox Code Playgroud) postgresql jdbc apache-kafka apache-kafka-connect confluent-platform
我们正在尝试使用 Confluent JDBC Sink 连接器将主题中的值写回 postgres 数据库。
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=xxx
tasks.max=1
topics=topic_name
auto.evolve=true
connection.user=confluent_rw
auto.create=true
connection.url=jdbc:postgresql://x.x.x.x:5432/Datawarehouse
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
Run Code Online (Sandbox Code Playgroud)
我们可以使用以下命令读取控制台中的值:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic topic_name
Run Code Online (Sandbox Code Playgroud)
模式存在并且值被正确反序列化,kafka-avro-console-consumer因为它没有给出错误但连接器给出了这些错误:
{
"name": "datawarehouse_sink",
"connector": {
"state": "RUNNING",
"worker_id": "x.x.x.x:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "x.x.x.x:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: f_machinestate_sink\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error …Run Code Online (Sandbox Code Playgroud) 我想我可能错过了一些配置,但我们正在尝试使用 Debezium 对一个包含大约 800 万条记录的表中的所有行进行快照,一段时间后它会停止。
连接器配置是:
{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"database.user":"MyUser",
"database.server.id":"12345",
"tasks.max":"1",
"database.history.kafka.bootstrap.servers":"MyKafka:9092",
"database.history.kafka.topic":"MyConnectorHistory",
"database.server.name":"MyDbName",
"database.port":"3306",
"table.whitelist":"BigTable",
"decimal.handling.mode":"double",
"database.hostname":"***",
"database.password":"***",
"name":"MyConnector",
"database.whitelist":"MyDb",
"snapshot.mode":"initial_only",
"connect.timeout.ms":"60000"
}
Run Code Online (Sandbox Code Playgroud)
连接器开始扫描行:
April 24th 2019, 13:06:52.573 2019-04-24 16:06:52,569 INFO MySQL|MyDbName|snapshot Step 9: - 2040000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:59:29.129 [io.debezium.connector.mysql.SnapshotReader]
... other prints
April 24th 2019, 12:17:28.448 2019-04-24 15:17:28,447 INFO MySQL|MyDbName|snapshot Step 9: - 50000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:10:05.008 [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:43.183 2019-04-24 15:07:43,183 INFO MySQL|MyDbName|snapshot …Run Code Online (Sandbox Code Playgroud) 我正在我们的 Kafka 生态系统中为 Kafka 连接设置监控。我已经为 kafka 代理启用了 JMX 导出器并且工作正常。现在我正在尝试为 kafka 连接启用 JMX 导出器。但是,从哪里开始有点不清楚。
我只能修改connect-distributed.sh以启用更改。任何指针都会是一个很好的补充。
kafka-run-class.sh已修改为能够jmx_exporter在上发出 jmx 指标http://<host>:9304/metrics
我想到卡夫卡连发出的指标http://<host>:19000/metrics一旦jmx_exporter被启用。
I use curl to submit to our Kafka Connect service a JSON request message with information about the connector, it is working successfully.
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ \"name\": \"inventory-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\", \"tasks.max\": \"1\", \"database.hostname\": \"mysql\", \"database.port\": \"3306\", \"database.user\": \"debezium\", \"database.password\": \"dbz\", \"database.server.id\": \"184054\", \"database.server.name\": \"dbserver1\", \"database.whitelist\": \"inventory\", \"database.history.kafka.bootstrap.servers\": \"kafka:9092\", \"database.history.kafka.topic\": \"dbhistory.inventory\" } }'
Run Code Online (Sandbox Code Playgroud)
now I am using node.js server to send data to kafka connect server.
var body = { …Run Code Online (Sandbox Code Playgroud) 在 Kafka confluent 中,我们如何在使用pk.mode=record_keyMySQL 表中的复合键的同时使用源作为 CSV 文件的 upsert ?使用pk.mode=record_values. 是否有任何额外的配置需要完成?
如果我尝试使用pk.mode=record_key. 错误 - 由以下原因引起org.apache.kafka.connect.errors.ConnectException::需要定义一个 PK 列,因为记录的关键架构是一种原始类型。以下是我的 JDBC 接收器连接器配置:
{
"name": "<name>",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "<topic name>",
"connection.url": "<url>",
"connection.user": "<user name>",
"connection.password": "*******",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "<table name>",
"pk.mode": "record_key",
"pk.fields": "field1,field2",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}
Run Code Online (Sandbox Code Playgroud) apache-kafka ×8
jdbc ×2
avro ×1
debezium ×1
distributed ×1
hdfs ×1
java ×1
jmx-exporter ×1
node.js ×1
postgresql ×1
prometheus ×1