在架构注册表中添加条目的“简单”语法如下:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' http://localhost:8081/subjects/test-value/versions
Run Code Online (Sandbox Code Playgroud)
然而,当从终端执行此操作时,如果模式很大,这可能会非常繁重且不切实际。有没有一种方便的方法不使用内联模式(即模式内容)来涂抹curl命令,而只传递avro模式文件(avsc)?
我知道有一个python 工具可以做到这一点:
$ python register_schema.py http://localhost:8081 persons-avro person.avsc
Run Code Online (Sandbox Code Playgroud)
我也知道我可以通过 http 请求在 Java 中做到这一点(使用大量样板代码)
但我想知道是否有一种方法可以直接从命令行执行此操作(没有 python,在普通的 bash 中)
我有一个生产者代码,正在向 Kafka 发送消息。直到昨天我才能发送消息。从今天开始,我无法发送消息。不确定是否是版本兼容问题。没有失败或错误消息,代码被执行,但没有发送消息。
以下是 Python 模块版本:
kafka-python==2.0.1
Python 3.8.2
下面是我的代码:
from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')
Run Code Online (Sandbox Code Playgroud)
我也尝试记录行为,但不知道为什么生产者被关闭:
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to 127.0.0.1:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.producer.kafka:Kafka producer closed
Process …
Run Code Online (Sandbox Code Playgroud) 浏览卡夫卡文档,我发现了这个特殊的配置。leader.imbalance.per.broker.percentage
。
直觉上是什么leader.imbalance.per.broker.percentage
意思?我怎样才能模拟这个配置的工作?
Type: int
Default: 10
Valid Values:
Importance: high
Update Mode: read-only
Run Code Online (Sandbox Code Playgroud)
为什么这个值是10
默认值?
我正在运行一个分布式 Kafka-Broker,其中代理间通信是使用 SASL/SSL 设置的。为此,我调整了此处给出的 JAAS 配置。完成的文件如下所示:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret"
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN;
org.apache.kafka.common.security.scram.ScramLoginModule required;
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
Run Code Online (Sandbox Code Playgroud)
我注意到“KafkaServer”部分有 2 个管理员用户。我也经历了惨痛的教训才知道我两者都需要,但这是为什么呢?我有一种感觉,几个月前我已经读过(并忘记了)原因,但我似乎再也找不到它了。
我已经在 kubernetes 上部署了 strimzi kafka,并且在本地也安装了 kube。但每次我想要在 kafka 中创建一个新主题时,我都需要通过 rancher 导入 yaml 文件并提供主题名称来创建一个主题。
有没有办法直接通过 kubectl 命令创建 kafka 主题?
这些是我用来运行 kafka 的命令:
Producer: kubectl run kafka-producer1 -ti --image=strimzi/kafka:0.18.0-kafka-2.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list 11.23.41.32:31025 --topic topic-name
Consumer: kubectl run kafka-consumer1 -ti --image=strimzi/kafka:0.18.0-kafka-2.4.0 --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server 11.23.41.32:31025 --topic topic-name --from-beginning
如果我创建一个新集群,其中包含 1 个代理、1 个主题、1 个分区,复制因子为 3,那么会发生什么?它会在该单个代理下创建 3 个副本(分区)?如果是,如何选举领导人?
我的应用程序控制台日志显示了以下很多内容:
%3|1602097315.970|FAIL|rdkafka#consumer-2| [thrd:kfkqaapq0002d.ch.me.com:9092/bootstrap]: kfkqaapq0002d.ch.me.com:9092/bootstrap: Failed to resolve 'kfkqaapq0002d.ch.me.com:9092': No such host is known. (after 42ms in state CONNECT)
%3|1602097315.970|FAIL|rdkafka#consumer-3| [thrd:kfkqaapq0002d.ch.me.com:9092/bootstrap]: kfkqaapq0002d.ch.me.com:9092/bootstrap: Failed to resolve 'kfkqaapq0002d.ch.me.com:9092': No such host is known. (after 41ms in state CONNECT)
%3|1602097315.972|FAIL|rdkafka#producer-1| [thrd:kfkqaapq0003d.ch.me.com:9092/bootstrap]: kfkqaapq0003d.ch.me.com:9092/bootstrap: Failed to resolve 'kfkqaapq0003d.ch.me.com:9092': No such host is known. (after 48ms in state CONNECT)
%3|1602097315.973|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: kfkqaapq0003d.ch.me.com:9092/bootstrap: Failed to resolve 'kfkqaapq0003d.ch.me.com:9092': No such host is known. (after 48ms in state CONNECT)
%3|1602097316.459|FAIL|rdkafka#producer-1| [thrd:kfkqaapq0001d.ch.me.com:9092/bootstrap]: kfkqaapq0001d.ch.me.com:9092/bootstrap: Failed to resolve 'kfkqaapq0001d.ch.me.com:9092': No such host …
Run Code Online (Sandbox Code Playgroud) 所以我正在学习 Kafka,并尝试在 docker compose 文件的帮助下在本地环境中设置它。我正在遵循以下示例:
https://docs.confluence.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html
按照这个例子,我已经完成了相当多的工作,直到进入步骤 8 的后半部分。
当尝试在 Kafka Connect 容器内部执行以下操作时kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic quickstart-jdbc-test --from-beginning --max-messages 10
,我收到以下消息,但无法确定它尝试连接的内容:
[2020-10-07 20:45:44,784] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-10-07 20:45:45,431] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-7022
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = …
Run Code Online (Sandbox Code Playgroud) 我正在开发一个推送通知,向客户端发送许多消息。消息被发布到主题中,订阅者从同一主题中读取消息。如果在从主题偏移量读取消息后立即出现错误,即使我无法发送消息,我的订阅者也需要读取下一条消息并发送它。我所说的错误是指服务器停机或出现严重问题。
如何阅读带有确认信息的消息?
我有一个简单的流处理器(不是消费者/生产者),如下所示(Kotlin)
\n@Bean\nfun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {\n return Function { input-> input.map { key, value ->\n println("\\nPAYLOAD KEY: ${key.name}\\n");\n println("\\nPAYLOAD value: ${value.address}\\n");\n val output = FooAddressPlus()\n output.address = value.address\n output.name = value.name\n output.plus = "$value.name-$value.address"\n KeyValue(key, output)\n }}\n}\n
Run Code Online (Sandbox Code Playgroud)\n这些类FooName
、FooAddress
和 与FooAddressPlus
处理器位于同一包中。\nHere\xe2\x80\x99s 我的配置文件:
spring.cloud.stream.kafka.binder:\n brokers: localhost:9093\n\nspring.cloud.stream.function.definition: processFoo\n\nspring.cloud.stream.kafka.streams.binder.functions.processFoo.applicationId: foo-processor\nspring.cloud.stream.bindings.processFoo-in-0:\n destination: foo.processor\nspring.cloud.stream.bindings.processFoo-out-0:\n destination: foo.processor.out\n\nspring.cloud.stream.kafka.streams.binder:\n deserializationExceptionHandler: logAndContinue\n configuration:\n default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde\n default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde\n commit.interval.ms: 1000\n
Run Code Online (Sandbox Code Playgroud)\n运行处理器时出现此错误:
\nThe class '<here_comes_package>.FooAddress' is not in the …
Run Code Online (Sandbox Code Playgroud) apache-kafka ×10
.net ×1
.net-core ×1
c# ×1
go ×1
jaas ×1
java ×1
kafka-python ×1
kotlin ×1
kubectl ×1
kubernetes ×1
python ×1
sasl ×1
strimzi ×1