标签: apache-kafka-connect

第二和第三个分布式Kafka Connector工作程序无法正常工作

使用3的Kafka集群和相同的Zookeeper集群,我提出了一个分布式连接器节点.此节点使用单个任务成功运行.然后我提出了第二个连接器,这似乎运行,因为任务中的一些代码肯定会运行.然而,它似乎没有活着(尽管没有抛出错误,由于缺乏预期的活动而没有保持活着,而第一个连接器继续正常运行).当我调用URL时http://localhost:8083/connectors/mqtt/tasks,在每个连接器节点上,它告诉我连接器有一个任务.我希望这是两个任务,每个节点/工作一个任务.(目前工作人员配置说,tasks.max = 1但我也尝试将其设置为3.

当我尝试启动第三个连接器时,我收到错误:

"POST /connectors HTTP/1.1" 500 90  5 
(org.apache.kafka.connect.runtime.rest.RestServer:60)

ERROR IO error forwarding REST request: 
(org.apache.kafka.connect.runtime.rest.RestServer:241) 
java.net.ConnectException: Connection refused
Run Code Online (Sandbox Code Playgroud)

尝试再次从shell调用连接器POST方法返回错误:

 {"error_code":500,"message":"IO Error trying to forward REST request:
 Connection refused"}
Run Code Online (Sandbox Code Playgroud)

我还尝试升级到今天发布的Apache Kafka 0.10.1.1.我还在看问题.每个连接器都运行在由单个映像定义的隔离Docker容器上.它们应该完全相同.

问题可能是我正在尝试http://localhost:8083/connectors对每个工作程序运行POST请求,当我只需要在单个工作程序上运行一次,然后该连接器的任务将自动分发给其他工作程序.如果是这种情况,我该如何分配任务?我目前将max设置为3,但只有一个似乎在一个worker上运行.

更新

我最终使用与Yuri建议的方法基本相同的方式运行.我给每个工作者一个唯一的组ID,然后给每个连接器任务指定相同的名称.这允许三个连接器及其单个任务共享一个偏移量,因此在接收器连接器的情况下,它们从Kafka消耗的消息不会重复.它们基本上作为独立连接器运行,因为工作人员具有不同的组ID,因此不会相互通信.

如果连接器工作程序具有相同的组ID,则无法添加多个具有相同名称的连接器.如果为连接器指定不同的名称,它们将具有不同的偏移量并消耗重复的消息.如果同一组中有三个工作人员,一个连接器和三个任务,理论上理论情况是任务共享一个偏移量,工作人员确保任务始终在运行并且分布均匀(每个任务都使用一个唯一的集合分区).实际上,连接器框架不会创建多个任务,即使tasks.max设置为3,主题任务消耗时也有25个分区.

如果有人知道我为什么会看到这种行为,请告诉我.

apache-kafka docker apache-kafka-connect

10
推荐指数
1
解决办法
1441
查看次数

如何使用from_json与Kafka connect 0.10和Spark Structured Streaming?

我试图重现[Databricks] [1]中的示例并将其应用于Kafka的新连接器并激发结构化流媒体,但我无法使用Spark中的开箱即用方法正确解析JSON ...

注意:该主题以JSON格式写入Kafka.

val ds1 = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", IP + ":9092")
          .option("zookeeper.connect", IP + ":2181")
          .option("subscribe", TOPIC)
          .option("startingOffsets", "earliest")
          .option("max.poll.records", 10)
          .option("failOnDataLoss", false)
          .load()
Run Code Online (Sandbox Code Playgroud)

以下代码不起作用,我相信这是因为列json是一个字符串而且与from_json签名方法不匹配...

    val df = ds1.select($"value" cast "string" as "json")
                .select(from_json("json") as "data")
                .select("data.*")
Run Code Online (Sandbox Code Playgroud)

有小费吗?

[更新]工作示例:https: //github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala

scala apache-kafka apache-spark apache-kafka-connect spark-structured-streaming

10
推荐指数
1
解决办法
6699
查看次数

"配置foo.bar已提供,但不是已知的配置"

当我以分布式模式(connect-runtime v1.0.0)启动连接器时,有几个必需的配置值.我说的是价值观:

offset.storage.topic
offset.storage.partitions
key.converter
config.storage.topic
config.storage.replication.factor
rest.port
status.storage.topic
key.converter.schemas.enable
value.converter.schemas.enable
internal.value.converter
internal.key.converter
internal.key.converter.schemas.enable
internal.value.converter.schemas.enable
status.storage.partitions
status.storage.topic
value.converter
offset.flush.interval.ms
offset.storage.replication.factor
...
Run Code Online (Sandbox Code Playgroud)

一旦连接器以这些属性的有意义值启动,它就会按预期工作.但是在启动时,日志变得充斥着像这样的条目

WARN  o.a.k.c.admin.AdminClientConfig.logUnused - The configuration 'offset.storage.topic' was supplied but isn't a known config.
Run Code Online (Sandbox Code Playgroud)

对于上述所有,强制配置值.有三个配置类正在记录警告:

org.apache.kafka.clients.consumer.ConsumerConfig
org.apache.kafka.clients.admin.AdminClientConfig
org.apache.kafka.clients.producer.ProducerConfig
Run Code Online (Sandbox Code Playgroud)

从现在起我还没有找到这种行为的原因.什么在这里丢失或出了什么问题导致这个警告?我不得不担心这个警告吗?

apache-kafka apache-kafka-connect

10
推荐指数
1
解决办法
6292
查看次数

Kafka Connect分布式模式组协调器不可用

我已经尝试了两个星期了,我在不同的机器上运行Kafka集群而不是我的连接节点.我无法正常运行连接.我可以读写卡夫卡没问题.Zookeeper似乎运行良好.

我启动连接:

$ bin/connect-distributed connect-distributed.properties
Run Code Online (Sandbox Code Playgroud)

Connect不断循环此错误:

[2018-08-21 15:45:12,161] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,163] INFO [Worker clientId=c1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2018-08-21 15:45:12,165] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)
[2018-08-21 15:45:12,266] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.40.219:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,267] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

10
推荐指数
1
解决办法
5587
查看次数

Kafka Connect将同一任务分配给多个工作人员

我在分布式模式下使用Kafka Connect。我现在多次观察到的一个奇怪行为是,经过一段时间(可能是几个小时,可能是几天),似乎出现了平衡错误:将相同的任务分配给多个工作人员。结果,它们并发运行,并且取决于连接器的性质,它们会失败或产生“不可预测的”输出。

我能够用来重现此行为的最简单的配置是:两个Kafka Connect工作程序,两个连接器,每个连接器仅执行一项任务。Kafka Connect已部署到Kubernetes中。Kafka本身位于Confluent Cloud中。Kafka Connect和Kafka的版本相同(5.3.1)。

日志中的相关消息:

工人A:

[2019-10-30 12:44:23,925] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,926] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-hdfs-sink, some-mqtt-source], taskIds=[some-hdfs-sink-0, some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
Run Code Online (Sandbox Code Playgroud)

工人B:

[2019-10-30 12:44:23,930] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,936] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

10
推荐指数
1
解决办法
158
查看次数

Kafka Connect 与 Amazon MSK

如何将 Kafka Connect 适配器与 Amazon MSK 配合使用?根据 AWS 文档,它支持 Kafka 连接,但没有记录如何设置适配器和使用它。

apache-kafka apache-kafka-connect amazon-msk

10
推荐指数
2
解决办法
6930
查看次数

Kafka Connect 中的连接器和任务之间是什么关系?

我们在一个项目中使用 Kafka Connect 一段时间了,目前完全只使用Confluence Kafka Connect JDBC 连接器。我正在努力理解“任务”在 Kafka Connect 中的作用,特别是对于这个连接器。我理解“连接器”;它们包含一系列有关特定源/接收器以及它们连接的主题的配置。我了解1:Many连接器和任务之间存在关系,以及任务用于并行工作的一般原则。但是,我们如何理解连接器何时会/可能创建多个任务?

  • 在源连接器的情况下,我们使用 JDBC 连接器通过时间戳和/或主键获取源数据,因此这本质上看起来是顺序的。事实上,我们所有的源连接器似乎都只有一项任务。什么会触发 Kafka Connect 创建多个连接器?目前我们正在分布式模式下运行Kafka Connect ,但只有一个worker;如果我们有多个工作人员,每个连接器是否可以执行多个任务,或者两者不相关?

  • 在接收器连接器的情况下,我们使用 显式配置每个接收器连接器tasks.max=1,因此毫不奇怪,我们也只看到每个连接器的一项任务。如果我们删除该配置,想必我们可以/将会获得多个任务。这是否意味着我们输入主题上的消息可能会被乱序消费?在这种情况下,如何保证变更的数据一致性?

此外,我们有时会看到单个连接器和任务都进入 FAILED 状态的情况(由于输入连接问题)。重新启动任务会将其从此状态中删除,并重新启动数据流,但连接器仍处于 FAILED 状态。这怎么可能——连接器的状态不只是其所有子任务的聚合吗?

apache-kafka apache-kafka-connect

10
推荐指数
1
解决办法
4940
查看次数

Kafka Connect耗尽堆空间

启动Kafka Connect(connect-standalone)后,我的任务在开始后立即失败:

java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:316)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

在一些Kafka文档中提到了堆空间,告诉你使用"默认"进行尝试,只有在出现问题时才修改它,但是没有指令来修改堆空间.

java heap jvm apache-kafka apache-kafka-connect

9
推荐指数
3
解决办法
2万
查看次数

Kafka Connect vs Streams for sinks

我试图了解 Connect 为您购买了什么而 Streams 没有。我们有应用程序的一部分,我们想在其中使用一个主题并写入 mariadb。

我可以用一个简单的处理器来完成这个。读取记录,存储在状态存储中,然后批量插入到 mariadb 中。

为什么这是一个坏主意?JDBC Sink Connector 给你带来了什么?

apache-kafka apache-kafka-streams apache-kafka-connect

9
推荐指数
1
解决办法
3695
查看次数

使用 RegexRouter 用下划线替换主题中的多个点

似乎不可能使用 RegexRoute 用下划线替换主题名称中的所有点,因为 RegexRouter 调用replaceFirstnot replaceAll。有没有解决的办法?我的一个想法是通过变换进行多次传递:

{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "10",
    "topics": "foo.bar.baz,some.topic",
    "s3.region": "us-east-1",
    "s3.bucket.name": "bucket",
    "s3.part.size": "5242880",
    "s3.compression.type": "gzip",
    "timezone": "UTC",
    "rotate.schedule.interval.ms": "900000",
    "flush.size": "1000000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "time",
    "schema.compatibility": "NONE",
    "name": "s3-sink",
    "transforms":"replaceFirstDot,replaceSecondDot",
    "transforms.replaceFirstDot.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.replaceFirstDot.regex": "\\.",
    "transforms.replaceFirstDot.replacement": "_",
    "transforms.replaceSecondDot.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.replaceSecondDot.regex": "\\.",
    "transforms.replaceSecondDot.replacement": "_"
}
Run Code Online (Sandbox Code Playgroud)

有没有一种简单的方法来包含自定义分区器或转换/路由器?

apache-kafka-connect confluent-platform

9
推荐指数
0
解决办法
434
查看次数