标签: apache-kafka-connect

Kafka Connect耗尽堆空间

启动Kafka Connect(connect-standalone)后,我的任务在开始后立即失败:

java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:316)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

在一些Kafka文档中提到了堆空间,告诉你使用"默认"进行尝试,只有在出现问题时才修改它,但是没有指令来修改堆空间.

java heap jvm apache-kafka apache-kafka-connect

9
推荐指数
3
解决办法
2万
查看次数

启用 SSL 后 Kafka Connect 耗尽 Java 堆空间

我最近启用了 SSL 并尝试以分布式模式启动 Kafka 连接。跑步时

connect-distributed connect-distributed.properties
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

[2018-10-09 16:50:57,190] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:106)
[2018-10-09 16:50:55,471] ERROR WorkerSinkTask{id=sink-mariadb-test} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:344)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:305)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) …
Run Code Online (Sandbox Code Playgroud)

java heap-memory apache-kafka apache-kafka-connect

9
推荐指数
1
解决办法
6635
查看次数

使用 RegexRouter 用下划线替换主题中的多个点

似乎不可能使用 RegexRoute 用下划线替换主题名称中的所有点,因为 RegexRouter 调用replaceFirstnot replaceAll。有没有解决的办法?我的一个想法是通过变换进行多次传递:

{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "10",
    "topics": "foo.bar.baz,some.topic",
    "s3.region": "us-east-1",
    "s3.bucket.name": "bucket",
    "s3.part.size": "5242880",
    "s3.compression.type": "gzip",
    "timezone": "UTC",
    "rotate.schedule.interval.ms": "900000",
    "flush.size": "1000000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "time",
    "schema.compatibility": "NONE",
    "name": "s3-sink",
    "transforms":"replaceFirstDot,replaceSecondDot",
    "transforms.replaceFirstDot.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.replaceFirstDot.regex": "\\.",
    "transforms.replaceFirstDot.replacement": "_",
    "transforms.replaceSecondDot.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.replaceSecondDot.regex": "\\.",
    "transforms.replaceSecondDot.replacement": "_"
}
Run Code Online (Sandbox Code Playgroud)

有没有一种简单的方法来包含自定义分区器或转换/路由器?

apache-kafka-connect confluent-platform

9
推荐指数
0
解决办法
434
查看次数

无需安装 Confluent Platform 即可使用 Confluent Hub

我正在使用这些安装说明获取融合中心客户端https://docs.confluent.io/current/connect/managing/confluent-hub/client.html

但是,当我开始安装 kafka-connect-elasticsearch 连接器时

confluent-hub install confluentinc/kafka-connect-elasticsearch:latest

我继续收到此错误消息:

Unable to detect Confluent Platform installation. Specify --component-dir and --worker-configs explicitly.

Error: Invalid options or arguments
Run Code Online (Sandbox Code Playgroud)

我在 Mac 上通过 Homebrew 安装了 ElasticSearch 和 Kafka

apache-kafka apache-kafka-connect confluent-platform

9
推荐指数
1
解决办法
4281
查看次数

卡夫卡领导人选举何时发生?

卡夫卡高级制作人选出领导者的时间和频率是多少?是在发送每条消息之前还是在创建连接时只执行一次?

apache-kafka kafka-producer-api apache-kafka-connect

8
推荐指数
1
解决办法
6516
查看次数

保护对 Kafka Connect 的 REST API 的访问

用于 Kafka Connect 的 REST API 不受保护和身份验证。由于未经身份验证,任何人都可以轻松访问连接器或任务的配置。由于这些配置可能包含关于如何访问源系统 [在 SourceConnector 的情况下] 和目标系统 [在 SinkConnector 的情况下],是否有标准方法来限制对这些 API 的访问?

rest apache-kafka apache-kafka-connect

8
推荐指数
2
解决办法
3377
查看次数

调试自定义Kafka连接器的简单有效方法是什么?

我正在使用几个Kafka连接器,我在控制台输出中没有看到它们的创建/部署中的任何错误,但是我没有得到我正在寻找的结果(没有任何结果,无论如何,期望或除此以外).我根据Kafka的示例FileStream连接器制作了这些连接器,因此我的调试技术基于使用示例中使用的SLF4J Logger.我已经搜索了我认为会在控制台输出中生成的日志消息,但无济于事.我在错误的地方找这些消息吗?或者是否有更好的方法来调试这些连接器?

我为实现引用的SLF4J Logger的示例用法:

Kafka FileStreamSinkTask

Kafka FileStreamSourceTask

java debugging slf4j apache-kafka apache-kafka-connect

8
推荐指数
1
解决办法
8448
查看次数

Kafka JDBC Sink 连接器:未分配任务

我尝试使用以下配置启动 JDBC 接收器连接器:

{
    "name": "crm_data-sink_hh",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 6,
        "topics": "crm_account,crm_competitor,crm_event,crm_event_participation",

        "connection.url": "jdbc:postgresql://db_host/hh?prepareThreshold=0",
        "connection.user": "db_user",
        "connection.password": "${file:db_hh_kafka_connect_pass}",
        "dialect.name": "PostgreSqlDatabaseDialect",

        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields": "guid",

        "errors.tolerance": "all",
        "errors.log.enable":true,
        "errors.log.include.messages":true,

        "errors.deadletterqueue.topic.name":"crm_data_deadletterqueue",
        "errors.deadletterqueue.context.headers.enable":true
    }
}
Run Code Online (Sandbox Code Playgroud)

但是当连接器处于运行状态时没有任务正在运行:

curl -X GET http://kafka-connect:10900/connectors/crm_data-sink_hh/status
{"name":"crm_data-sink_hh","connector":{"state":"RUNNING","worker_id":"172.16.24.14:10900"},"tasks":[],"type":"sink"}
Run Code Online (Sandbox Code Playgroud)

我多次遇到这个问题,但我很困惑,因为它是随机发生的。我的问题和这个问题非常相似。我将不胜感激任何帮助!


更新。11/04/2019(不幸的是,现在我只有INFO级别的日志)

最后,经过几次尝试,我通过更新现有连接器的配置来启动连接器来运行任务crm_data-sink_db_hh

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}

$ curl -X PUT -d @new_config.json http://docker21:10900/connectors/crm_data-sink_db_hh/config -H 'Content-Type: application/json'

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"UNASSIGNED","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}

$ curl …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

8
推荐指数
1
解决办法
3450
查看次数

我们可以在 mongodb 中更新/更新记录吗?数据源是kafka

我们可以在 mongodb 中更新/更新记录,但是如果有任何方法或函数可以直接在 mongodb 中更新或更新文档,并且源系统是 kafka,目标是 mongodb。

apache-kafka apache-kafka-connect mongodb-kafka-connector

8
推荐指数
1
解决办法
1400
查看次数

使用 mongodb-source-connect 时出现“仅在副本集上支持 $changeStream 阶段”错误

新年快乐

我在这里是因为我遇到一个错误运行时,卡夫卡的MongoDB源连我试图运行连接,独立connect-avro-standalone.propertiesMongoSourceConnector.properties使这是写在MongoDB的卡夫卡话题连接写入数据.

在尝试这一进展时,我遇到了这个错误,我找不到答案,所以我写在这里。

这就是我想做的

bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties share/confluent-hub-components/mongodb-kafka-connect-mongodb/etc/MongoSourceConnector.properties
Run Code Online (Sandbox Code Playgroud)

这是connect-avro-standalone.properties

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how …
Run Code Online (Sandbox Code Playgroud)

mongodb avro apache-kafka apache-kafka-connect

8
推荐指数
2
解决办法
1万
查看次数