标签: apache-kafka

使用文件 (Curl) 在 Kafka Schema 注册表中创建新条目

在架构注册表中添加条目的“简单”语法如下:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' http://localhost:8081/subjects/test-value/versions
Run Code Online (Sandbox Code Playgroud)

然而,当从终端执行此操作时,如果模式很大,这可能会非常繁重且不切实际。有没有一种方便的方法不使用内联模式(即模式内容)来涂抹curl命令,而只传递avro模式文件(avsc)?

我知道有一个python 工具可以做到这一点:

$ python register_schema.py http://localhost:8081 persons-avro person.avsc
Run Code Online (Sandbox Code Playgroud)

我也知道我可以通过 http 请求在 Java 中做到这一点(使用大量样板代码)

但我想知道是否有一种方法可以直接从命令行执行此操作(没有 python,在普通的 bash 中)

apache-kafka confluent-schema-registry

1
推荐指数
1
解决办法
2948
查看次数

无法将消息发送到 Kafka Python 中的主题

我有一个生产者代码,正在向 Kafka 发送消息。直到昨天我才能发送消息。从今天开始,我无法发送消息。不确定是否是版本兼容问题。没有失败或错误消息,代码被执行,但没有发送消息。

以下是 Python 模块版本:

kafka-python==2.0.1
Python 3.8.2

下面是我的代码:

from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')
Run Code Online (Sandbox Code Playgroud)

我也尝试记录行为,但不知道为什么生产者被关闭:

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to 127.0.0.1:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.producer.kafka:Kafka producer closed

Process …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-python

1
推荐指数
1
解决办法
1万
查看次数

Apache Kafka 配置中“leader.imbalance.per.broker.percentage”的含义

浏览卡夫卡文档,我发现了这个特殊的配置。leader.imbalance.per.broker.percentage

直觉上是什么leader.imbalance.per.broker.percentage意思?我怎样才能模拟这个配置的工作?

Type:   int
Default:    10
Valid Values:   
Importance: high
Update Mode:    read-only
Run Code Online (Sandbox Code Playgroud)

为什么这个值是10默认值?

apache-kafka

1
推荐指数
1
解决办法
2766
查看次数

使用 SASL 进行 Kafka 身份验证 - 重复的管理员用户?

我正在运行一个分布式 Kafka-Broker,其中代理间通信是使用 SASL/SSL 设置的。为此,我调整了此处给出的 JAAS 配置。完成的文件如下所示:

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret"
  security.protocol=SASL_PLAINTEXT
  sasl.mechanism=PLAIN;

  org.apache.kafka.common.security.scram.ScramLoginModule required;
};

Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret";
};
Run Code Online (Sandbox Code Playgroud)

我注意到“KafkaServer”部分有 2 个管理员用户。我也经历了惨痛的教训才知道我两者都需要,但这是为什么呢?我有一种感觉,几个月前我已经读过(并忘记了)原因,但我似乎再也找不到它了。

sasl jaas apache-kafka

1
推荐指数
1
解决办法
2091
查看次数

是否有 kubectl 命令可以在不使用 import yaml 文件选项的情况下创建 kafka 主题?

我已经在 kubernetes 上部署了 strimzi kafka,并且在本地也安装了 kube。但每次我想要在 kafka 中创建一个新主题时,我都需要通过 rancher 导入 yaml 文件并提供主题名称来创建一个主题。

有没有办法直接通过 kubectl 命令创建 kafka 主题?

这些是我用来运行 kafka 的命令:

Producer: kubectl run kafka-producer1 -ti --image=strimzi/kafka:0.18.0-kafka-2.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list 11.23.41.32:31025 --topic topic-name

Consumer: kubectl run kafka-consumer1 -ti --image=strimzi/kafka:0.18.0-kafka-2.4.0 --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server 11.23.41.32:31025 --topic topic-name --from-beginning

apache-kafka kubernetes kubectl strimzi

1
推荐指数
1
解决办法
9131
查看次数

如果我在 kafka 中有一个复制因子为 3 的代理怎么办?

如果我创建一个新集群,其中包含 1 个代理、1 个主题、1 个分区,复制因子为 3,那么会发生什么?它会在该单个代理下创建 3 个副本(分区)?如果是,如何选举领导人?

apache-kafka kafka-consumer-api kafka-producer-api

1
推荐指数
1
解决办法
2121
查看次数

如何从使用 librdkafka.redist 作为依赖项的 C# 应用程序访问 librdkafka.redist 日志?

我的应用程序控制台日志显示了以下很多内容:

%3|1602097315.970|FAIL|rdkafka#consumer-2| [thrd:kfkqaapq0002d.ch.me.com:9092/bootstrap]: kfkqaapq0002d.ch.me.com:9092/bootstrap: Failed to resolve 'kfkqaapq0002d.ch.me.com:9092': No such host is known.  (after 42ms in state CONNECT)
%3|1602097315.970|FAIL|rdkafka#consumer-3| [thrd:kfkqaapq0002d.ch.me.com:9092/bootstrap]: kfkqaapq0002d.ch.me.com:9092/bootstrap: Failed to resolve 'kfkqaapq0002d.ch.me.com:9092': No such host is known.  (after 41ms in state CONNECT)
%3|1602097315.972|FAIL|rdkafka#producer-1| [thrd:kfkqaapq0003d.ch.me.com:9092/bootstrap]: kfkqaapq0003d.ch.me.com:9092/bootstrap: Failed to resolve 'kfkqaapq0003d.ch.me.com:9092': No such host is known.  (after 48ms in state CONNECT)
%3|1602097315.973|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: kfkqaapq0003d.ch.me.com:9092/bootstrap: Failed to resolve 'kfkqaapq0003d.ch.me.com:9092': No such host is known.  (after 48ms in state CONNECT)
%3|1602097316.459|FAIL|rdkafka#producer-1| [thrd:kfkqaapq0001d.ch.me.com:9092/bootstrap]: kfkqaapq0001d.ch.me.com:9092/bootstrap: Failed to resolve 'kfkqaapq0001d.ch.me.com:9092': No such host …
Run Code Online (Sandbox Code Playgroud)

.net c# apache-kafka .net-core confluent-platform

1
推荐指数
1
解决办法
1226
查看次数

Docker Kafka Avro 控制台消费者 - 连接被拒绝

所以我正在学习 Kafka,并尝试在 docker compose 文件的帮助下在本地环境中设置它。我正在遵循以下示例:

https://docs.confluence.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html

按照这个例子,我已经完成了相当多的工作,直到进入步骤 8 的后半部分。

当尝试在 Kafka Connect 容器内部执行以下操作时kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic quickstart-jdbc-test --from-beginning --max-messages 10,我收到以下消息,但无法确定它尝试连接的内容:

[2020-10-07 20:45:44,784] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-10-07 20:45:45,431] INFO ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [kafka:9092]
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = console-consumer-7022
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka docker-compose confluent-schema-registry

1
推荐指数
1
解决办法
2353
查看次数

在 Confluence kafka go 中读取来自 kafka 主题的消息时如何使用确认?

我正在开发一个推送通知,向客户端发送许多消息。消息被发布到主题中,订阅者从同一主题中读取消息。如果在从主题偏移量读取消息后立即出现错误,即使我无法发送消息,我的订阅者也需要读取下一条消息并发送它。我所说的错误是指服务器停机或出现严重问题。

如何阅读带有确认信息的消息?

producer-consumer go apache-kafka

1
推荐指数
1
解决办法
4395
查看次数

Spring Cloud Stream与Kafka Streams Binder:如何为流处理器设置`trusted.packages`(这与消费者和生产者不同)

我有一个简单的流处理器(不是消费者/生产者),如下所示(Kotlin)

\n
@Bean\nfun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {\n    return Function { input-> input.map { key, value ->\n        println("\\nPAYLOAD KEY: ${key.name}\\n");\n        println("\\nPAYLOAD value: ${value.address}\\n");\n        val output = FooAddressPlus()\n        output.address = value.address\n        output.name = value.name\n        output.plus = "$value.name-$value.address"\n        KeyValue(key, output)\n    }}\n}\n
Run Code Online (Sandbox Code Playgroud)\n

这些类FooNameFooAddress和 与FooAddressPlus处理器位于同一包中。\nHere\xe2\x80\x99s 我的配置文件:

\n
spring.cloud.stream.kafka.binder:\n  brokers: localhost:9093\n\nspring.cloud.stream.function.definition: processFoo\n\nspring.cloud.stream.kafka.streams.binder.functions.processFoo.applicationId: foo-processor\nspring.cloud.stream.bindings.processFoo-in-0:\n  destination: foo.processor\nspring.cloud.stream.bindings.processFoo-out-0:\n  destination: foo.processor.out\n\nspring.cloud.stream.kafka.streams.binder:\n  deserializationExceptionHandler: logAndContinue\n  configuration:\n    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde\n    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde\n    commit.interval.ms: 1000\n
Run Code Online (Sandbox Code Playgroud)\n

运行处理器时出现此错误:

\n
The class '<here_comes_package>.FooAddress' is not in the …
Run Code Online (Sandbox Code Playgroud)

kotlin apache-kafka spring-cloud-stream

1
推荐指数
1
解决办法
1669
查看次数