标签: confluent-platform

汇合平台vs阿帕奇卡夫卡

我是kafka的新手,我对Confluent平台感到好奇.

看来Confluent平台上的用户故事并不多.

汇合平台是否只为卡夫卡增加了更多的价值?

或者任何人都可以告诉我你最喜欢哪一个?

我必须选择其中一个.

apache-kafka confluent confluent-platform

26
推荐指数
1
解决办法
1万
查看次数

卡夫卡生产者刷新和轮询之间的区别

我们有一个 Kafka 消费者,它将读取消息并执行这些操作,然后使用以下脚本再次发布到 Kafka 主题

生产者配置:

{
  "bootstrap.servers": "localhost:9092"
}
Run Code Online (Sandbox Code Playgroud)

我还没有配置任何其他配置,如 queue.buffering.max.messages queue.buffering.max.ms batch.num.messages

我假设这些都将成为配置中的默认值

queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000
Run Code Online (Sandbox Code Playgroud)

我的理解:当内部队列达到 queue.buffering.max.ms 或 batch.num.messages 中的任何一个时,消息将在单独的线程中发布到 Kafka。在我的配置中 queue.buffering.max.ms 是 0,所以当我调用 generate() 时每条消息都会被发布。如果我错了,请纠正我。

我的制作人片段:

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.flush()
Run Code Online (Sandbox Code Playgroud)

这篇文章我了解到,在每条消息后使用刷新,生产者将成为同步生产者。如果我使用上面的脚本,发布到 Kafka 需要大约 45 毫秒

如果我将上面的代码段更改为

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.poll(0)
Run Code Online (Sandbox Code Playgroud)

有什么性能会提高吗?你能澄清一下我的理解吗?

谢谢

python apache-kafka kafka-producer-api confluent-platform

13
推荐指数
1
解决办法
2万
查看次数

超过 max.poll.interval.ms 后,Kafka 消费者卡住了

当消费者在 5 分钟内(默认值 max.poll.interval.ms 300000ms)没有收到消息时,消费者会停止而不退出程序。消费者进程挂起,不再消费任何消息。

记录以下错误消息

MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 255ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Run Code Online (Sandbox Code Playgroud)

我看到在 confluent-kafka-go 中ErrMaxPollExceeded定义了它,但无法找到它在哪里被提出。

如果出现任何此类错误,为什么程序不退出?

  • 卡夫卡 v1.1.0
  • librdkafka v1.0.0
  • confluent-kafka-go(主)

用于 kafka.Consumer 的配置

{
    "bootstrap.servers":    "private.kafka.host",
    "group.id":             "foo.bar",
    "auto.offset.reset":    "earliest",
    "enable.auto.commit":   false,
}
Run Code Online (Sandbox Code Playgroud)

go apache-kafka librdkafka confluent-platform

10
推荐指数
1
解决办法
3941
查看次数

System.ObjectDisposeException:句柄被破坏

当我们启动停止消费者服务的进程时,Kafka消费者应用程序抛出ObjectDisposeException。我想了解什么进程破坏了kafka句柄

System.ObjectDisposedException: handle is destroyed
at Confluent.Kafka.Impl.SafeKafkaHandle.ThrowIfHandleClosed()
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api confluent-platform

10
推荐指数
1
解决办法
2500
查看次数

AWS MSK 和 Confluence SchemaRegistry 以及 Confluence Kafka 连接如何建议一起使用?

我们计划将 AWS MSK 服务用于托管 Kafka 和架构注册表以及 Confluence 的 Kafka Connect 服务来运行我们的连接器(Elasticsearch Sink Connector)。我们计划在 EC2 中运行架构注册表和连接器。

根据 Confluence 团队的说法,如果我们对 Kafka 使用 MSK,他们将无法正式支持 Confluence Schema Registry 和 Kafka Connect。

那么,有谁可以分享一下他们的经验吗?就像 Anybuddy 在生产环境中组合使用 MSK 和 Confluence 服务一样吗?

使用这种组合有风险吗?

是否推荐使用这种组合?

如果我们遇到连接器方面的任何问题,Confluence 社区如何提供支持?

还有其他建议、意见或替代方案吗?

我们已经拥有 Confluence 企业平台许可证,但我们希望拥有托管 Kafka 服务,这就是我们选择 AWS MKS 的原因,因为根据我们的分析,它比 Confluence Cloud 非常经济高效?

请分享您的想法并提前致谢。

谢谢

apache-kafka confluent-schema-registry confluent-cloud aws-msk confluent-platform

10
推荐指数
1
解决办法
9100
查看次数

如何使用Python-kafka管理客户端在Kafka中创建主题?

是否有任何Python kafka管理客户端可用于从python程序创建主题/删除主题?我发现了一些 python api,但它们都没有可用的 Admin api?

Confluence 有 python admin api 吗?

apache-kafka kafka-consumer-api kafka-producer-api confluent-platform

9
推荐指数
2
解决办法
1万
查看次数

使用 RegexRouter 用下划线替换主题中的多个点

似乎不可能使用 RegexRoute 用下划线替换主题名称中的所有点,因为 RegexRouter 调用replaceFirstnot replaceAll。有没有解决的办法?我的一个想法是通过变换进行多次传递:

{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "10",
    "topics": "foo.bar.baz,some.topic",
    "s3.region": "us-east-1",
    "s3.bucket.name": "bucket",
    "s3.part.size": "5242880",
    "s3.compression.type": "gzip",
    "timezone": "UTC",
    "rotate.schedule.interval.ms": "900000",
    "flush.size": "1000000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "time",
    "schema.compatibility": "NONE",
    "name": "s3-sink",
    "transforms":"replaceFirstDot,replaceSecondDot",
    "transforms.replaceFirstDot.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.replaceFirstDot.regex": "\\.",
    "transforms.replaceFirstDot.replacement": "_",
    "transforms.replaceSecondDot.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.replaceSecondDot.regex": "\\.",
    "transforms.replaceSecondDot.replacement": "_"
}
Run Code Online (Sandbox Code Playgroud)

有没有一种简单的方法来包含自定义分区器或转换/路由器?

apache-kafka-connect confluent-platform

9
推荐指数
0
解决办法
434
查看次数

无需安装 Confluent Platform 即可使用 Confluent Hub

我正在使用这些安装说明获取融合中心客户端https://docs.confluent.io/current/connect/managing/confluent-hub/client.html

但是,当我开始安装 kafka-connect-elasticsearch 连接器时

confluent-hub install confluentinc/kafka-connect-elasticsearch:latest

我继续收到此错误消息:

Unable to detect Confluent Platform installation. Specify --component-dir and --worker-configs explicitly.

Error: Invalid options or arguments
Run Code Online (Sandbox Code Playgroud)

我在 Mac 上通过 Homebrew 安装了 ElasticSearch 和 Kafka

apache-kafka apache-kafka-connect confluent-platform

9
推荐指数
1
解决办法
4281
查看次数

如何在kafka中强制模式验证

我正在使用 Kafka 和架构注册表。定义了一个模式,在生产者端使用 confluence 的 KafkaAvroSerializer。一切正常。

另一方面,如果生产者在不遵守架构的情况下发布事件,则发布事件不会出现任何问题。

据了解,Kafka 仅获取序列化的二进制文件,不会检查数据和功能是否按设计工作。

想知道是否有更好的方法来强制执行更强大的模式验证,以便主题不会被不良数据污染?

apache-kafka confluent-schema-registry confluent-platform

9
推荐指数
1
解决办法
2万
查看次数

安装 kafka 连接器时出现 ConfluenceHubClient 异常

我已经设置了 Kafka 集群并安装confluent-hubEC2实例上。我已经下载了confluent-hubtar 文件,解压并将二进制文件放入/usr/local/bin.

当我现在尝试安装任何连接器时,我收到 ClassNotFoundException。

[ec2-user@ip-172-31-88-110 bin]$ confluent-hub install confluentinc/kafka-connect-jdb c:latest --component-dir /opt/connectors --worker-configs /etc/kafka/connect.properties

/usr/local/bin/confluent-hub: line 13: cd: /usr/local/bin/../share/java: No such file or directory
Error: Could not find or load main class io.confluent.connect.hub.cli.ConfluentHubClient
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.hub.cli.ConfluentHubClient
Run Code Online (Sandbox Code Playgroud)

我哪里出错了?

apache-kafka-connect mongodb-kafka-connector confluent-platform

8
推荐指数
2
解决办法
2966
查看次数