我是kafka的新手,我对Confluent平台感到好奇.
看来Confluent平台上的用户故事并不多.
汇合平台是否只为卡夫卡增加了更多的价值?
或者任何人都可以告诉我你最喜欢哪一个?
我必须选择其中一个.
我们有一个 Kafka 消费者,它将读取消息并执行这些操作,然后使用以下脚本再次发布到 Kafka 主题
生产者配置:
{
"bootstrap.servers": "localhost:9092"
}
Run Code Online (Sandbox Code Playgroud)
我还没有配置任何其他配置,如 queue.buffering.max.messages queue.buffering.max.ms batch.num.messages
我假设这些都将成为配置中的默认值
queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000
Run Code Online (Sandbox Code Playgroud)
我的理解:当内部队列达到 queue.buffering.max.ms 或 batch.num.messages 中的任何一个时,消息将在单独的线程中发布到 Kafka。在我的配置中 queue.buffering.max.ms 是 0,所以当我调用 generate() 时每条消息都会被发布。如果我错了,请纠正我。
我的制作人片段:
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.flush()
Run Code Online (Sandbox Code Playgroud)
从这篇文章我了解到,在每条消息后使用刷新,生产者将成为同步生产者。如果我使用上面的脚本,发布到 Kafka 需要大约 45 毫秒
如果我将上面的代码段更改为
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.poll(0)
Run Code Online (Sandbox Code Playgroud)
有什么性能会提高吗?你能澄清一下我的理解吗?
谢谢
当消费者在 5 分钟内(默认值 max.poll.interval.ms 300000ms)没有收到消息时,消费者会停止而不退出程序。消费者进程挂起,不再消费任何消息。
记录以下错误消息
MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 255ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Run Code Online (Sandbox Code Playgroud)
我看到在 confluent-kafka-go 中ErrMaxPollExceeded定义了它,但无法找到它在哪里被提出。
如果出现任何此类错误,为什么程序不退出?
用于 kafka.Consumer 的配置
{
"bootstrap.servers": "private.kafka.host",
"group.id": "foo.bar",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
}
Run Code Online (Sandbox Code Playgroud) 当我们启动停止消费者服务的进程时,Kafka消费者应用程序抛出ObjectDisposeException。我想了解什么进程破坏了kafka句柄
System.ObjectDisposedException: handle is destroyed
at Confluent.Kafka.Impl.SafeKafkaHandle.ThrowIfHandleClosed()
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)
Run Code Online (Sandbox Code Playgroud) 我们计划将 AWS MSK 服务用于托管 Kafka 和架构注册表以及 Confluence 的 Kafka Connect 服务来运行我们的连接器(Elasticsearch Sink Connector)。我们计划在 EC2 中运行架构注册表和连接器。
根据 Confluence 团队的说法,如果我们对 Kafka 使用 MSK,他们将无法正式支持 Confluence Schema Registry 和 Kafka Connect。
那么,有谁可以分享一下他们的经验吗?就像 Anybuddy 在生产环境中组合使用 MSK 和 Confluence 服务一样吗?
使用这种组合有风险吗?
是否推荐使用这种组合?
如果我们遇到连接器方面的任何问题,Confluence 社区如何提供支持?
还有其他建议、意见或替代方案吗?
我们已经拥有 Confluence 企业平台许可证,但我们希望拥有托管 Kafka 服务,这就是我们选择 AWS MKS 的原因,因为根据我们的分析,它比 Confluence Cloud 非常经济高效?
请分享您的想法并提前致谢。
谢谢
apache-kafka confluent-schema-registry confluent-cloud aws-msk confluent-platform
是否有任何Python kafka管理客户端可用于从python程序创建主题/删除主题?我发现了一些 python api,但它们都没有可用的 Admin api?
Confluence 有 python admin api 吗?
apache-kafka kafka-consumer-api kafka-producer-api confluent-platform
似乎不可能使用 RegexRoute 用下划线替换主题名称中的所有点,因为 RegexRouter 调用replaceFirstnot replaceAll。有没有解决的办法?我的一个想法是通过变换进行多次传递:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "10",
"topics": "foo.bar.baz,some.topic",
"s3.region": "us-east-1",
"s3.bucket.name": "bucket",
"s3.part.size": "5242880",
"s3.compression.type": "gzip",
"timezone": "UTC",
"rotate.schedule.interval.ms": "900000",
"flush.size": "1000000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"timestamp.extractor": "RecordField",
"timestamp.field": "time",
"schema.compatibility": "NONE",
"name": "s3-sink",
"transforms":"replaceFirstDot,replaceSecondDot",
"transforms.replaceFirstDot.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.replaceFirstDot.regex": "\\.",
"transforms.replaceFirstDot.replacement": "_",
"transforms.replaceSecondDot.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.replaceSecondDot.regex": "\\.",
"transforms.replaceSecondDot.replacement": "_"
}
Run Code Online (Sandbox Code Playgroud)
有没有一种简单的方法来包含自定义分区器或转换/路由器?
我正在使用这些安装说明获取融合中心客户端https://docs.confluent.io/current/connect/managing/confluent-hub/client.html
但是,当我开始安装 kafka-connect-elasticsearch 连接器时
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest
我继续收到此错误消息:
Unable to detect Confluent Platform installation. Specify --component-dir and --worker-configs explicitly.
Error: Invalid options or arguments
Run Code Online (Sandbox Code Playgroud)
我在 Mac 上通过 Homebrew 安装了 ElasticSearch 和 Kafka
我正在使用 Kafka 和架构注册表。定义了一个模式,在生产者端使用 confluence 的 KafkaAvroSerializer。一切正常。
另一方面,如果生产者在不遵守架构的情况下发布事件,则发布事件不会出现任何问题。
据了解,Kafka 仅获取序列化的二进制文件,不会检查数据和功能是否按设计工作。
想知道是否有更好的方法来强制执行更强大的模式验证,以便主题不会被不良数据污染?
我已经设置了 Kafka 集群并安装confluent-hub在EC2实例上。我已经下载了confluent-hubtar 文件,解压并将二进制文件放入/usr/local/bin.
当我现在尝试安装任何连接器时,我收到 ClassNotFoundException。
[ec2-user@ip-172-31-88-110 bin]$ confluent-hub install confluentinc/kafka-connect-jdb c:latest --component-dir /opt/connectors --worker-configs /etc/kafka/connect.properties
/usr/local/bin/confluent-hub: line 13: cd: /usr/local/bin/../share/java: No such file or directory
Error: Could not find or load main class io.confluent.connect.hub.cli.ConfluentHubClient
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.hub.cli.ConfluentHubClient
Run Code Online (Sandbox Code Playgroud)
我哪里出错了?
apache-kafka-connect mongodb-kafka-connector confluent-platform