我正在尝试找到一种解决方案,将数据从 Kafka 直接流式传输到 Oracle。最有效的解决方案是什么?
apache-kafka apache-kafka-streams spring-kafka apache-kafka-connect
只是想知道在使用 Kafka Streams Builder 编写持久层时可能会遇到什么样的问题,或者有什么使用 Kafka Connect JDBC (Sink) 的建议吗?
我有一个场景,我们使用 FluentD 代理将应用程序日志转发到 Kafka 主题,因为 Kafka 团队引入了 Kerberos 身份验证,而 fluenceD 版本不支持此身份验证,我无法直接使用转发日志。现在我们引入了一个没有身份验证的新 Kafka 服务器,并在那里创建了一个主题,我想使用 Kafka 连接器将消息从新服务器中的这个主题转发到另一个服务器中的另一个主题,想知道如何实现这一点?
我想杀死我的 Kafka Connect 分布式工作程序,但我无法(或者我不知道如何)确定在 Linux 中运行的哪个进程是该工作程序。
跑步时
ps aux | grep worker
Run Code Online (Sandbox Code Playgroud)
我确实看到很多工作进程,但不确定哪个是连接工作进程,哪些是标准非连接工作进程
确实,昨天只启动了其中一个进程,我怀疑就是这个,但这显然不是所有情况下的充分条件,例如,如果 Kafka 集群昨天上线。那么,一般来说,我如何确定哪个进程是 Kafka Connect Worker?
这里的万无一失的方法是什么?
我想在使用 Kafka Connect 源 JDBC 连接器导入表时设置消息键。
当定义了多个表以从 JDBC 连接器读取时,如何将 Kafka Connect/Source 中的单消息转换 (SMT) 定位到正确的字段?SMT 需要一个列名,当有多个表时,该列名可能会有所不同。
我没有看到基于表名或类似名称过滤 SMT 定义的方法。下面的代码示例工作正常,因为它只有一张表。
但是如果你有不同的表,比如 User, Order, Product 怎么办?
"table.whitelist" : "User"
"transforms":"createKey,extract",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"user_id",
"transforms.extract.type":"org.apache.kafka.connect.transforms.ExtractField\$Key",
"transforms.extract.field":"user_id",
Run Code Online (Sandbox Code Playgroud)
当具有该配置的工作任务遇到没有该 user_id 字段的表时,它会崩溃并保持 FAILED 状态
org.apache.kafka.connect.errors.ConnectException:
Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 …Run Code Online (Sandbox Code Playgroud) 我的目标是使用容器部署 Kafka 连接连接器(s3 接收器),因此了解有关工作人员可能会或可能不会直接相互通信的详细信息非常重要。
从我读到的内容来看,我无法确定工作人员之间是否会进行直接通信(例如一名工作人员使用 REST 与另一名工作人员交谈,或其他)。
我一开始认为,当我使用 REST 更新配置时,配置更改是使用 connect_config 内部主题和/或使用 Kafka 消费者协调器传播的。
但从我读到的:
rest.advertised.host.name
If this is set, this is the hostname that will be given out to other workers to connect to.
Run Code Online (Sandbox Code Playgroud)
有关工人如何以及为什么(如果这样做)彼此沟通的任何详细信息?
我当前的测试配置如下所示:
version: '3.7'
services:
postgres:
image: debezium/postgres
restart: always
ports:
- "5432:5432"
zookeeper:
image: debezium/zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafka:
image: debezium/kafka
restart: always
ports:
- "9092:9092"
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS=250
connect:
image: debezium/connect
restart: always
ports:
- "8083:8083"
links:
- zookeeper
- postgres
- kafka
depends_on:
- zookeeper
- postgres
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses
Run Code Online (Sandbox Code Playgroud)
我像这样使用 docker-compose 运行它:
$ docker-compose up …Run Code Online (Sandbox Code Playgroud) 我有一个正在使用的 jdbc 源连接器,并且我一直在使用 Postman 来测试和设置它。db 连接的密码是明文,只要我是唯一看到它的人就可以了。
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:db2://db2server.mycompany.com:4461/myDB",
"connection.user: "dbUser",
"connection.password": "DBPASSinClearText!",
"dialect.name": "Db2DatabaseDialect",
"mode": "timestamp",
"query": "select fname, lname, custId, custRegion, lastUpdate from CustomerMaster",
"timestamp.column.name": "lastUpdate",
"table.types": "TABLE",
"topic.prefix": "master.customer"
}
}
Run Code Online (Sandbox Code Playgroud)
在生产中,部署团队是设置配置的人。有没有办法对此进行加密,这样我就不必与该团队共享明文密码?
使用 Spark 流,我可以读取 Kafka 消息并将数据写入不同类型的表,例如 HBase、Hive 和 Kudu。但这也可以通过对这些表使用 Kafka 连接器来完成。我的问题是,在哪些情况下我应该更喜欢连接器而不是 Spark 流解决方案。
Kafka 连接器解决方案的容忍度如何?我们知道通过 Spark 流,我们可以使用在多个节点上运行的检查点和执行器进行容错执行,但是 Kafka 连接器如何实现容错(如果可能)?通过在多个节点上运行连接器?
apache-kafka apache-spark apache-kafka-connect spark-structured-streaming
我试图在执行 connect-distributed 命令后创建一个 kafka 连接器。我写了一个 entrypoint.sh 脚本并将它与 CMD 一起使用。我有这样的 docker 文件:
FROM confluentinc/cp-kafka
RUN mkdir /plugins
RUN mkdir /config
COPY kafka-connect-couchbase-*.jar /plugins/
COPY config /config/
RUN chmod +x /config/stage/entrypoint.sh
ENV EXPOSED_PORT 8083
CMD /config/stage/entrypoint.sh
Run Code Online (Sandbox Code Playgroud)
我有入口点脚本文件为:
connect-distributed config/"${DEPLOY_ENV}"/connect-distributed.properties
curl -X POST -H "Content-Type: application/json" -d @config.json http://localhost:8083/connectors
Run Code Online (Sandbox Code Playgroud)
deploy_env 无关紧要,它来自 jenkins。config 文件和distributed.properties 也无关紧要,它是正确的,我手动尝试过。
Kafka connect 启动没有问题,但是用于创建连接器的curl命令无效。
简而言之,我想在 connect-distributed 启动后创建一个连接器,而不在容器外执行任何休息请求。我如何做到这一点?
apache-kafka ×10
apache-spark ×1
debezium ×1
docker ×1
jdbc ×1
nomad ×1
postgresql ×1
spring-kafka ×1