使用3的Kafka集群和相同的Zookeeper集群,我提出了一个分布式连接器节点.此节点使用单个任务成功运行.然后我提出了第二个连接器,这似乎运行,因为任务中的一些代码肯定会运行.然而,它似乎没有活着(尽管没有抛出错误,由于缺乏预期的活动而没有保持活着,而第一个连接器继续正常运行).当我调用URL时http://localhost:8083/connectors/mqtt/tasks,在每个连接器节点上,它告诉我连接器有一个任务.我希望这是两个任务,每个节点/工作一个任务.(目前工作人员配置说,tasks.max = 1但我也尝试将其设置为3.
当我尝试启动第三个连接器时,我收到错误:
"POST /connectors HTTP/1.1" 500 90 5
(org.apache.kafka.connect.runtime.rest.RestServer:60)
ERROR IO error forwarding REST request:
(org.apache.kafka.connect.runtime.rest.RestServer:241)
java.net.ConnectException: Connection refused
Run Code Online (Sandbox Code Playgroud)
尝试再次从shell调用连接器POST方法返回错误:
{"error_code":500,"message":"IO Error trying to forward REST request:
Connection refused"}
Run Code Online (Sandbox Code Playgroud)
我还尝试升级到今天发布的Apache Kafka 0.10.1.1.我还在看问题.每个连接器都运行在由单个映像定义的隔离Docker容器上.它们应该完全相同.
问题可能是我正在尝试http://localhost:8083/connectors对每个工作程序运行POST请求,当我只需要在单个工作程序上运行一次,然后该连接器的任务将自动分发给其他工作程序.如果是这种情况,我该如何分配任务?我目前将max设置为3,但只有一个似乎在一个worker上运行.
我最终使用与Yuri建议的方法基本相同的方式运行.我给每个工作者一个唯一的组ID,然后给每个连接器任务指定相同的名称.这允许三个连接器及其单个任务共享一个偏移量,因此在接收器连接器的情况下,它们从Kafka消耗的消息不会重复.它们基本上作为独立连接器运行,因为工作人员具有不同的组ID,因此不会相互通信.
如果连接器工作程序具有相同的组ID,则无法添加多个具有相同名称的连接器.如果为连接器指定不同的名称,它们将具有不同的偏移量并消耗重复的消息.如果同一组中有三个工作人员,一个连接器和三个任务,理论上理论情况是任务共享一个偏移量,工作人员确保任务始终在运行并且分布均匀(每个任务都使用一个唯一的集合分区).实际上,连接器框架不会创建多个任务,即使tasks.max设置为3,主题任务消耗时也有25个分区.
如果有人知道我为什么会看到这种行为,请告诉我.
我试图重现[Databricks] [1]中的示例并将其应用于Kafka的新连接器并激发结构化流媒体,但我无法使用Spark中的开箱即用方法正确解析JSON ...
注意:该主题以JSON格式写入Kafka.
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", IP + ":9092")
.option("zookeeper.connect", IP + ":2181")
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load()
Run Code Online (Sandbox Code Playgroud)
以下代码不起作用,我相信这是因为列json是一个字符串而且与from_json签名方法不匹配...
val df = ds1.select($"value" cast "string" as "json")
.select(from_json("json") as "data")
.select("data.*")
Run Code Online (Sandbox Code Playgroud)
有小费吗?
[更新]工作示例:https: //github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala
scala apache-kafka apache-spark apache-kafka-connect spark-structured-streaming
当我以分布式模式(connect-runtime v1.0.0)启动连接器时,有几个必需的配置值.我说的是价值观:
offset.storage.topic
offset.storage.partitions
key.converter
config.storage.topic
config.storage.replication.factor
rest.port
status.storage.topic
key.converter.schemas.enable
value.converter.schemas.enable
internal.value.converter
internal.key.converter
internal.key.converter.schemas.enable
internal.value.converter.schemas.enable
status.storage.partitions
status.storage.topic
value.converter
offset.flush.interval.ms
offset.storage.replication.factor
...
Run Code Online (Sandbox Code Playgroud)
一旦连接器以这些属性的有意义值启动,它就会按预期工作.但是在启动时,日志变得充斥着像这样的条目
WARN o.a.k.c.admin.AdminClientConfig.logUnused - The configuration 'offset.storage.topic' was supplied but isn't a known config.
Run Code Online (Sandbox Code Playgroud)
对于上述所有,强制配置值.有三个配置类正在记录警告:
org.apache.kafka.clients.consumer.ConsumerConfig
org.apache.kafka.clients.admin.AdminClientConfig
org.apache.kafka.clients.producer.ProducerConfig
Run Code Online (Sandbox Code Playgroud)
从现在起我还没有找到这种行为的原因.什么在这里丢失或出了什么问题导致这个警告?我不得不担心这个警告吗?
我已经尝试了两个星期了,我在不同的机器上运行Kafka集群而不是我的连接节点.我无法正常运行连接.我可以读写卡夫卡没问题.Zookeeper似乎运行良好.
我启动连接:
$ bin/connect-distributed connect-distributed.properties
Run Code Online (Sandbox Code Playgroud)
Connect不断循环此错误:
[2018-08-21 15:45:12,161] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,163] INFO [Worker clientId=c1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2018-08-21 15:45:12,165] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)
[2018-08-21 15:45:12,266] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.40.219:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,267] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is …Run Code Online (Sandbox Code Playgroud) 我在分布式模式下使用Kafka Connect。我现在多次观察到的一个奇怪行为是,经过一段时间(可能是几个小时,可能是几天),似乎出现了平衡错误:将相同的任务分配给多个工作人员。结果,它们并发运行,并且取决于连接器的性质,它们会失败或产生“不可预测的”输出。
我能够用来重现此行为的最简单的配置是:两个Kafka Connect工作程序,两个连接器,每个连接器仅执行一项任务。Kafka Connect已部署到Kubernetes中。Kafka本身位于Confluent Cloud中。Kafka Connect和Kafka的版本相同(5.3.1)。
日志中的相关消息:
工人A:
[2019-10-30 12:44:23,925] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,926] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-hdfs-sink, some-mqtt-source], taskIds=[some-hdfs-sink-0, some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
Run Code Online (Sandbox Code Playgroud)
工人B:
[2019-10-30 12:44:23,930] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,936] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, …Run Code Online (Sandbox Code Playgroud) 如何将 Kafka Connect 适配器与 Amazon MSK 配合使用?根据 AWS 文档,它支持 Kafka 连接,但没有记录如何设置适配器和使用它。
我们在一个项目中使用 Kafka Connect 一段时间了,目前完全只使用Confluence Kafka Connect JDBC 连接器。我正在努力理解“任务”在 Kafka Connect 中的作用,特别是对于这个连接器。我理解“连接器”;它们包含一系列有关特定源/接收器以及它们连接的主题的配置。我了解1:Many连接器和任务之间存在关系,以及任务用于并行工作的一般原则。但是,我们如何理解连接器何时会/可能创建多个任务?
在源连接器的情况下,我们使用 JDBC 连接器通过时间戳和/或主键获取源数据,因此这本质上看起来是顺序的。事实上,我们所有的源连接器似乎都只有一项任务。什么会触发 Kafka Connect 创建多个连接器?目前我们正在分布式模式下运行Kafka Connect ,但只有一个worker;如果我们有多个工作人员,每个连接器是否可以执行多个任务,或者两者不相关?
在接收器连接器的情况下,我们使用 显式配置每个接收器连接器tasks.max=1,因此毫不奇怪,我们也只看到每个连接器的一项任务。如果我们删除该配置,想必我们可以/将会获得多个任务。这是否意味着我们输入主题上的消息可能会被乱序消费?在这种情况下,如何保证变更的数据一致性?
此外,我们有时会看到单个连接器和任务都进入 FAILED 状态的情况(由于输入连接问题)。重新启动任务会将其从此状态中删除,并重新启动数据流,但连接器仍处于 FAILED 状态。这怎么可能——连接器的状态不只是其所有子任务的聚合吗?
启动Kafka Connect(connect-standalone)后,我的任务在开始后立即失败:
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:316)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
在一些Kafka文档中提到了堆空间,告诉你使用"默认"进行尝试,只有在出现问题时才修改它,但是没有指令来修改堆空间.
我试图了解 Connect 为您购买了什么而 Streams 没有。我们有应用程序的一部分,我们想在其中使用一个主题并写入 mariadb。
我可以用一个简单的处理器来完成这个。读取记录,存储在状态存储中,然后批量插入到 mariadb 中。
为什么这是一个坏主意?JDBC Sink Connector 给你带来了什么?
似乎不可能使用 RegexRoute 用下划线替换主题名称中的所有点,因为 RegexRouter 调用replaceFirstnot replaceAll。有没有解决的办法?我的一个想法是通过变换进行多次传递:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "10",
"topics": "foo.bar.baz,some.topic",
"s3.region": "us-east-1",
"s3.bucket.name": "bucket",
"s3.part.size": "5242880",
"s3.compression.type": "gzip",
"timezone": "UTC",
"rotate.schedule.interval.ms": "900000",
"flush.size": "1000000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"timestamp.extractor": "RecordField",
"timestamp.field": "time",
"schema.compatibility": "NONE",
"name": "s3-sink",
"transforms":"replaceFirstDot,replaceSecondDot",
"transforms.replaceFirstDot.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.replaceFirstDot.regex": "\\.",
"transforms.replaceFirstDot.replacement": "_",
"transforms.replaceSecondDot.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.replaceSecondDot.regex": "\\.",
"transforms.replaceSecondDot.replacement": "_"
}
Run Code Online (Sandbox Code Playgroud)
有没有一种简单的方法来包含自定义分区器或转换/路由器?