我正在使用 MirrorMaker2 进行灾难恢复。
Kafka 2.7 应支持 自动消费者偏移同步
这是我正在使用的 yaml 文件(我使用 strimzi 来创建它)
所有源集群主题都会复制到目标集群中。另外...checkpoint.internal主题是在目标集群中创建的,其中包含同步的所有源集群偏移量,但我没有看到这些偏移量被转换为目标集群_consumer_offsets主题,这意味着我何时将在目标中启动消费者(同一消费者组) cluster 它将从头开始读取消息。
我的期望是,在允许自动消费者偏移量后,同步来自翻译并存储在目标集群中的 _consumer_offsets 主题中的源集群的所有消费者偏移量。
有人可以澄清我的期望是否正确,如果不正确,它应该如何运作。
我将 Kafka 与 strimzi 运算符一起使用,创建了一个 Kafka 集群,并使用 yml 文件部署了 Kafka 连接。但在此之后我完全空白下一步该做什么。我读到 Kafka connect 用于将数据从源复制到 Kafka 集群或从 Kafka 集群复制到另一个目的地。我想使用 Kafka connect 将数据从文件复制到 Kafka 集群的任何主题。任何人都可以帮助我,我该怎么做,我正在共享创建 Kafka 连接集群的 yml 文件。
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect-cluster
# annotations:
# # use-connector-resources configures this KafkaConnect
# # to use KafkaConnector resources to avoid
# # needing to call the Connect REST API directly
# strimzi.io/use-connector-resources: "true"
spec:
version: 2.6.0
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
group.id: connect-cluster …
Run Code Online (Sandbox Code Playgroud) 我使用 Kafka Strimzi 运算符在 Kubernetes 上运行 Kafka。我通过使用以下内容配置我的消费者来使用增量粘性重新平衡策略:
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.class.getName()
Run Code Online (Sandbox Code Playgroud)
每次我扩展消费者组中的消费者时,该组中的所有现有消费者都会生成以下异常
线程“main”中出现异常 org.apache.kafka.common.errors.RebalanceInProgressException:由于使用者正在进行自动分区分配的重新平衡,因此无法完成偏移提交。您可以尝试通过调用 poll() 来完成重新平衡,然后重试该操作
知道导致此异常的原因和/或如何解决它吗?
谢谢。
我想在 Kafka 之上使用 HTTP 代理。我看到两个具有相同目的的项目:
我使用 strimzi 运算符在 Kubernetes 上启动 Kafka。
它们有何不同?什么时候使用哪一个?
嘿,我正在使用 Kafka Strimzi。我使用以下 yml 文件创建了我的 kafkaTopic 和 KafkaUser:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: my-user
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
# Example consumer Acls for topic my-topic using consumer group my-group
- resource:
type: topic
name: my-topic
patternType: literal
operation: Read
host: "*"
- resource:
type: topic
name: my-topic
patternType: literal
operation: Describe
host: "*"
- resource:
type: group
name: my-group
patternType: literal
operation: Read
host: "*"
# Example Producer Acls for topic my-topic …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用kube-prometheus-stack helm 图表来监控Strimzi。我已经按照Strimzi官方文档中的教程进行了设置。在本教程中,他们都使用 Podmonitors 和 Prometheus 配置来获取一些指标。但我不太明白为什么我需要为某些指标设置 Podmonitor 并为其他指标在 prometheus.prometheusSpec.additionalScrapeConfigs 中添加作业。有人可以向我解释其中的区别吗?
Kafka elasticsearch 连接器“confluenceinc-kafka-connect-elasticsearch-5.5.0”无法在本地工作。
"java.lang.NoClassDefFoundError: com/google/common/collect/ImmutableSet\n\tat io.searchbox.client.AbstractJestClient.<init>(AbstractJestClient.java:38)\n\tat io.searchbox.client.http.JestHttpClient.<init>(JestHttpClient.java:43)\n\tat io.searchbox.client.JestClientFactory.getObject(JestClientFactory.java:51)\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:149)\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:141)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:305)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
Run Code Online (Sandbox Code Playgroud)
我还在同一路径中使用 mssql 连接器和 s3 连接器插件;它们可以工作,但 elasticsearch 插件给出 noclassfound 错误。这是我在工作人员中的文件夹结构:
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ ls
confluentinc-kafka-connect-elasticsearch-5.5.0 confluentinc-kafka-connect-s3-5.5.0 debezium-connector-sqlserver kafka-connect-shell-sink-5.1.0
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r plugins]$ ls -l
total 16
drwxrwxr-x 2 root root 4096 May 25 22:15 confluentinc-kafka-connect-elasticsearch-5.5.0
drwxrwxr-x 5 root root 4096 May 15 02:26 confluentinc-kafka-connect-s3-5.5.0
drwxrwxr-x 2 root root 4096 May 15 02:26 debezium-connector-sqlserver
drwxrwxr-x 4 root root 4096 May 15 02:26 kafka-connect-shell-sink-5.1.0
[kafka@mssql-minio-connect-cluster-connect-d9859784f-ffj8r …
Run Code Online (Sandbox Code Playgroud) 我已经在 kubernetes 上部署了 strimzi kafka,并且在本地也安装了 kube。但每次我想要在 kafka 中创建一个新主题时,我都需要通过 rancher 导入 yaml 文件并提供主题名称来创建一个主题。
有没有办法直接通过 kubectl 命令创建 kafka 主题?
这些是我用来运行 kafka 的命令:
Producer: kubectl run kafka-producer1 -ti --image=strimzi/kafka:0.18.0-kafka-2.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list 11.23.41.32:31025 --topic topic-name
Consumer: kubectl run kafka-consumer1 -ti --image=strimzi/kafka:0.18.0-kafka-2.4.0 --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server 11.23.41.32:31025 --topic topic-name --from-beginning
strimzi ×8
apache-kafka ×7
kubernetes ×5
kafka-rest ×1
kafka-topic ×1
kubectl ×1
prometheus ×1