如何通过 Kafka Connector 将数据从 Kafka 流式传输到 MongoDB

Vu *_*Anh 7 mongodb apache-kafka apache-kafka-connect

我想使用 Kafka Connector 将数据从 Kafka 流式传输到 MongoDB。我找到了这个https://github.com/hpgrahsl/kafka-connect-mongodb。但是没有步骤可做。

谷歌搜索后,它似乎导致了我不想使用的Confluent Platform。

任何人都可以分享我的文档/指南,如何在不使用 Confluent 平台或其他 Kafka 连接器的情况下使用kafka-connect-mongodb将数据从 Kafka 流式传输到 MongoDB?

先感谢您。


我试过的

Step1:我mongo-kafka-connect-0.1-all.jarmaven central下载

Step2:将jar文件复制到一个新文件夹plugins里面kafka(我在windows上用的是Kafka,所以目录是D:\git\1.libraries\kafka_2.12-2.2.0\plugins

步骤 3:connect-standalone.properties通过添加新行来 编辑文件plugin.path=/git/1.libraries/kafka_2.12-2.2.0/plugins

步骤 4:我为 mongoDB 接收器添加新的配置文件 MongoSinkConnector.properties

name=mongo-sink
topics=test
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true

# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb://localhost:27017,mongo1:27017,mongo2:27017,mongo3:27017
database=test_kafka
collection=transaction
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect
Run Code Online (Sandbox Code Playgroud)

第五步:运行命令 bin\windows\connect-standalone.bat config\connect-standalone.properties config\MongoSinkConnector.properties

但是,我得到了错误

[2019-07-09 10:19:09,466] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2019-07-09 10:19:09,467] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2019-07-09 10:19:09,467] WARN The configuration 'offset.storage.file.filename' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2019-07-09 10:19:09,468] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2019-07-09 10:19:09,469] WARN The configuration 'plugin.path' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2019-07-09 10:19:09,469] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2019-07-09 10:19:09,470] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
Jul 09, 2019 10:19:10 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will be ignored.
Jul 09, 2019 10:19:10 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.RootResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored.
Jul 09, 2019 10:19:10 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be ignored.
Jul 09, 2019 10:19:11 AM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2019-07-09 10:19:12,302] ERROR WorkerSinkTask{id=mongo-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'this': was expecting 'null', 'true', 'false' or NaN
 at [Source: (byte[])"this is a message"; line: 1, column: 6]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'this': was expecting 'null', 'true', 'false' or NaN
 at [Source: (byte[])"this is a message"; line: 1, column: 6]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3508)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken2(UTF8StreamJsonParser.java:2843)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchTrue(UTF8StreamJsonParser.java:2777)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:807)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
        at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
        at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:342)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-07-09 10:19:12,305] ERROR WorkerSinkTask{id=mongo-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
Run Code Online (Sandbox Code Playgroud)

我设置错了什么配置或者我错过了什么?


我修好了它。现在,我可以成功地将数据从 Kafka 流式传输到 MongoDB

我的解决方法是:

  1. 将我的 kafka 移动到 C:\kafka_2.12-2.2.0
  2. 更新与新路径对应的 plugin_path
  3. 更新配置文件 connect-standalone.properties

Rob*_*att 1

MongoDB 本身有一个官方的源和接收器连接器。它可以在 Confluence Hub 上找到:https ://www.confluence.io/hub/mongodb/kafka-connect-mongodb

如果您不想使用 Confluence Platform,您可以自行部署 Apache Kafka - 它已经包含 Kafka Connect。您使用哪些插件(连接器)取决于您。在这种情况下,您将使用 Kafka Connect(Apache Kafka 的一部分)加上 kafka-connect-mongodb(由 MongoDB 提供)。

有关如何使用它的文档位于:https://docs.mongodb.com/kafka-connector/current/