我在使用官方 Confluent Kafka Python API 时遇到错误:
我订阅:
kafka_consumer.subscribe(topics=["my-avro-topic"], on_assign=on_assign_callback, on_revoke=on_revoke_callback)
Run Code Online (Sandbox Code Playgroud)
使用回调:
def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
print("without position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
topic_partitions_with_offsets = consumer.position(topic_partitions)
print("assigned to {}->{} partitions".format(len(topic_partitions), len(topic_partitions_with_offsets)))
for topic_partition in topic_partitions_with_offsets:
print("with position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
Run Code Online (Sandbox Code Playgroud)
产生控制台输出:
without position. topic=my-avro-topic. partition=0. offset=-1001. error=None
assigned to 1->1 partitions
with position. topic=my-avro-topic. partition=0. offset=-1001. error=KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="(null)"}
Run Code Online (Sandbox Code Playgroud)
有人可以解释一下吗?为什么我会收到未知分区的回调通知?类似的代码使用 Java API 可以完美运行。
我按照此处提到的教程重新设置了我的 kafka 制作人:https : //www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04
我正在使用 cron 和以下带有 IP 的服务器上的脚本向生产者推送一些事件:1.2.3.4
#!/usr/bin/env python
import threading, logging, time
import multiprocessing
import requests
import datetime
import json
from kafka import KafkaProducer
class CheckApis():
apisList = {"a": "https://test.eng.com/"}
kafkaProducer = "1.2.3.4:9092"
kafkaTopic = "sometopic"
producer = KafkaProducer(bootstrap_servers=kafkaProducer)
for key, value in apisList.items():
headers = {};
response = requests.request("GET", value, headers=headers)
message = {"app_name": key, "status": response.status_code, "message": "none", "timestamp": str(datetime.datetime.utcnow())}
producer.send(kafkaTopic, json.dumps(message).encode('utf-8'));
print (response.text)
print (response.status_code)
producer.close()
Run Code Online (Sandbox Code Playgroud)
这很有效,我可以使用以下命令查看推送的事件:
~/kafka/bin/kafka-console-consumer.sh --zookeeper 1.2.3.4:2181 --topic sometopic …Run Code Online (Sandbox Code Playgroud) python node.js apache-kafka kafka-consumer-api apache-kafka-connect
我已经启动了一个 Nifi 进程(消费 Kafka)并将其连接到一个主题。它正在运行,但我无法(不知道)在哪里可以查看消息?
我希望能够通过属性读取主题,而无需在 Kafka 侦听器注释上指定任何内容。不使用 Spring Boot。
我尝试通过“主题”键直接从属性对象读取主题。这给出了一个错误:IllegalStateException:topics, topicPattern, or topicPartitions must be provided.
// some class
@KafkaListener
public void listener(List<String> messages) {
System.out.print(messages);
}
//some other class
@Bean
public ConsumerFactory<String, String> consumerFactory(Properties topicProp) {
return new DefaultKafkaConsumerFactory(topicProp);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
Properties prop = new Properties();
prop.setProperty("topics", "my-custom-topic");
factory.setConsumerFactory(this.consumerFactory(prop));
return factory;
}
Is this possible?
Run Code Online (Sandbox Code Playgroud) 我正在学习 Kafka 并尝试为我最近的搜索应用程序创建一个主题。推送到 kafka 主题的数据被认为是一个很大的数字。
我的 kafka 集群有 3 个代理,并且已经为其他需求创建了主题。
现在我应该为最近的搜索主题选择多少个分区?如果我没有明确提供分区号怎么办?选择分区号需要考虑哪些事项?
apache-kafka kafka-consumer-api kafka-producer-api kafka-topic
所以,我是 Kafka 的新手,我已经阅读了一段时间。我在 confluent 上找到了这些信息。
https://docs.confluent.io/current/streams/architecture.html
所以我从这里了解到,假设我有一个名为 plain_text 的主题,我只是将一堆记录作为纯文本发送,而我只有一个具有单个主题和单个分区的代理。我现在启动 2 个消费者实例 ConsumerA 和 ConsumerB。由于我的分区计数小于消费者计数,因此只有一个消费者应该主动消费消息,而另一个消费者则处于空闲状态。如果我错了,请纠正我。
我使用 kafka-console-* 脚本进行了测试
bin/zookeeper-server-start.sh config/zookeeper.properties
Run Code Online (Sandbox Code Playgroud)
bin/kafka-server-start.sh config/server.properties
Run Code Online (Sandbox Code Playgroud)
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic plain_text
Run Code Online (Sandbox Code Playgroud)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic plain_text
Run Code Online (Sandbox Code Playgroud)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic plain_text \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property group.id=test_group \
Run Code Online (Sandbox Code Playgroud)
因此,两个消费者中的一个应该拥有该单个分区(如果我错了,请再次纠正我),但是我在生产者控制台上生成的任何内容在两个消费者控制台上都是可见的。为什么两个消费者都在使用来自单个分区的消息。我是否遗漏了什么或对 kafka-console-* 脚本执行了不同的规则。
apache-kafka kafka-consumer-api kafka-producer-api kafka-partition
我有一个使用 spring-kafka 配置的 Springboot 应用程序,我想在其中处理收听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而丢失/无法使用任何消息,则会重试 2 次,之后该消息应记录到错误文件中。我有两种可以遵循的方法:-
第一种方法(使用 SeekToCurrentErrorHandler 和 DeadLetterPublishingRecoverer):-
@Autowired
KafkaTemplate<String,Object> template;
@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".DLT", r.partition());
}
});
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));
factory.setErrorHandler(errorHandler);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
但为此我们需要添加主题(一个新的 .DLT 主题),然后我们可以将其记录到文件中。
@Bean
public KafkaAdmin admin() {
Map<String, Object> …Run Code Online (Sandbox Code Playgroud) 我有一个负载不是恒定的系统。我们每天可能会收到 1000 个请求,或者根本没有请求。
我们使用 Kafka 在服务之间传递请求。我们保留了 Kafka 消费者的平均数量以减少产生的成本。现在,如果当天没有收到请求,我的 Kafka 消费者将处于理想状态,如果收到太多请求,则会出现延迟。
我们希望将这些消费者保持在自动缩放模式,这样,如果请求数量激增,我的服务器(Kafka 消费者)数量将会增加。一旦请求数量减少,我们将删除服务器。因此,Kafka分区必须相应增加或减少
apache-kafka ×7
java ×2
python ×2
spring-kafka ×2
apache-nifi ×1
kafka-topic ×1
node.js ×1
spring ×1
spring-boot ×1