标签: kafka-consumer-api

用于 Kafka 的 Confluent Python API

我在使用官方 Confluent Kafka Python API 时遇到错误:

我订阅:

kafka_consumer.subscribe(topics=["my-avro-topic"], on_assign=on_assign_callback, on_revoke=on_revoke_callback)
Run Code Online (Sandbox Code Playgroud)

使用回调:

def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
    print("without position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
                                   topic_partition.offset, topic_partition.error))

topic_partitions_with_offsets = consumer.position(topic_partitions)
print("assigned to {}->{} partitions".format(len(topic_partitions), len(topic_partitions_with_offsets)))

for topic_partition in topic_partitions_with_offsets:
    print("with position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
                                   topic_partition.offset, topic_partition.error))
Run Code Online (Sandbox Code Playgroud)

产生控制台输出:

without position. topic=my-avro-topic. partition=0. offset=-1001. error=None
assigned to 1->1 partitions
with position. topic=my-avro-topic. partition=0. offset=-1001. error=KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="(null)"}
Run Code Online (Sandbox Code Playgroud)

有人可以解释一下吗?为什么我会收到未知分区的回调通知?类似的代码使用 Java API 可以完美运行。

python apache-kafka kafka-consumer-api confluent-platform

0
推荐指数
1
解决办法
2512
查看次数

无法远程使用 kafka 事件错误:connect ECONNREFUSED 5.6.7.8:9092

我按照此处提到的教程重新设置了我的 kafka 制作人:https : //www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04

我正在使用 cron 和以下带有 IP 的服务器上的脚本向生产者推送一些事件:1.2.3.4

#!/usr/bin/env python
import threading, logging, time
import multiprocessing
import requests
import datetime
import json
from kafka import KafkaProducer

class CheckApis():
    apisList = {"a": "https://test.eng.com/"}
    kafkaProducer = "1.2.3.4:9092"
    kafkaTopic = "sometopic"
    producer = KafkaProducer(bootstrap_servers=kafkaProducer)
    for key, value in apisList.items():
        headers = {};
        response = requests.request("GET", value, headers=headers)
        message = {"app_name": key, "status": response.status_code, "message": "none", "timestamp": str(datetime.datetime.utcnow())}
        producer.send(kafkaTopic, json.dumps(message).encode('utf-8'));
        print (response.text)
        print (response.status_code)
    producer.close()
Run Code Online (Sandbox Code Playgroud)

这很有效,我可以使用以下命令查看推送的事件:

~/kafka/bin/kafka-console-consumer.sh --zookeeper 1.2.3.4:2181 --topic sometopic …
Run Code Online (Sandbox Code Playgroud)

python node.js apache-kafka kafka-consumer-api apache-kafka-connect

0
推荐指数
1
解决办法
1815
查看次数

如何在Nifi中查看Kafka的消费消息?

我已经启动了一个 Nifi 进程(消费 Kafka)并将其连接到一个主题。它正在运行,但我无法(不知道)在哪里可以查看消息?

apache-kafka kafka-consumer-api apache-nifi

0
推荐指数
1
解决办法
4462
查看次数

如何使用工厂为特定主题配置 Spring Kafka Listener?

我希望能够通过属性读取主题,而无需在 Kafka 侦听器注释上指定任何内容。不使用 Spring Boot。

我尝试通过“主题”键直接从属性对象读取主题。这给出了一个错误:IllegalStateException:topics, topicPattern, or topicPartitions must be provided.

// some class
@KafkaListener
public void listener(List<String> messages) {
  System.out.print(messages);
}

//some other class
@Bean
public ConsumerFactory<String, String> consumerFactory(Properties topicProp) {
  return new DefaultKafkaConsumerFactory(topicProp);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

  Properties prop = new Properties();
  prop.setProperty("topics", "my-custom-topic");

  factory.setConsumerFactory(this.consumerFactory(prop));
  return factory;
}

Is this possible?
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka kafka-consumer-api spring-kafka

0
推荐指数
1
解决办法
2876
查看次数

kafka 主题中理想的分区数是多少?

我正在学习 Kafka 并尝试为我最近的搜索应用程序创建一个主题。推送到 kafka 主题的数据被认为是一个很大的数字。

我的 kafka 集群有 3 个代理,并且已经为其他需求创建了主题。

现在我应该为最近的搜索主题选择多少个分区?如果我没有明确提供分区号怎么办?选择分区号需要考虑哪些事项?

apache-kafka kafka-consumer-api kafka-producer-api kafka-topic

0
推荐指数
1
解决办法
2340
查看次数

在控制台模式下运行时,Kafka 的并行概念是否适用?

所以,我是 Kafka 的新手,我已经阅读了一段时间。我在 confluent 上找到了这些信息。

https://docs.confluent.io/current/streams/architecture.html

所以我从这里了解到,假设我有一个名为 plain_text 的主题,我只是将一堆记录作为纯文本发送,而我只有一个具有单个主题和单个分区的代理。我现在启动 2 个消费者实例 ConsumerA 和 ConsumerB。由于我的分区计数小于消费者计数,因此只有一个消费者应该主动消费消息,而另一个消费者则处于空闲状态。如果我错了,请纠正我。

我使用 kafka-console-* 脚本进行了测试

启动一个zookeeper集群

bin/zookeeper-server-start.sh config/zookeeper.properties
Run Code Online (Sandbox Code Playgroud)

在 localhost:9092 上启动一个 kafka 代理

bin/kafka-server-start.sh config/server.properties
Run Code Online (Sandbox Code Playgroud)

创建一个带有一个分区的主题纯文本

bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic plain_text
Run Code Online (Sandbox Code Playgroud)

开始制作人

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic plain_text
Run Code Online (Sandbox Code Playgroud)

启动属于同一组的 2 个消费者(运行相同的命令两次)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic plain_text \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property group.id=test_group \

Run Code Online (Sandbox Code Playgroud)

因此,两个消费者中的一个应该拥有该单个分区(如果我错了,请再次纠正我),但是我在生产者控制台上生成的任何内容在两个消费者控制台上都是可见的。为什么两个消费者都在使用来自单个分区的消息。我是否遗漏了什么或对 kafka-console-* 脚本执行了不同的规则。

apache-kafka kafka-consumer-api kafka-producer-api kafka-partition

0
推荐指数
1
解决办法
63
查看次数

Kafka Consumer 中更好的错误处理方式

我有一个使用 spring-kafka 配置的 Springboot 应用程序,我想在其中处理收听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而丢失/无法使用任何消息,则会重试 2 次,之后该消息应记录到错误文件中。我有两种可以遵循的方法:-

第一种方法(使用 SeekToCurrentErrorHandler 和 DeadLetterPublishingRecoverer):-

@Autowired
KafkaTemplate<String,Object> template;

@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                (r, e) -> {
                    if (e instanceof FooException) {
                        return new TopicPartition(r.topic() + ".DLT", r.partition());
                    }
                });
        ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));

        factory.setErrorHandler(errorHandler);
        return factory;
    }
Run Code Online (Sandbox Code Playgroud)

但为此我们需要添加主题(一个新的 .DLT 主题),然后我们可以将其记录到文件中。

@Bean
    public KafkaAdmin admin() {
        Map<String, Object> …
Run Code Online (Sandbox Code Playgroud)

java spring-boot kafka-consumer-api spring-kafka

0
推荐指数
1
解决办法
2万
查看次数

动态增加或减少Kafka分区

我有一个负载不是恒定的系统。我们每天可能会收到 1000 个请求,或者根本没有请求。

我们使用 Kafka 在服务之间传递请求。我们保留了 Kafka 消费者的平均数量以减少产生的成本。现在,如果当天没有收到请求,我的 Kafka 消费者将处于理想状态,如果收到太多请求,则会出现延迟。

我们希望将这些消费者保持在自动缩放模式,这样,如果请求数量激增,我的服务器(Kafka 消费者)数量将会增加。一旦请求数量减少,我们将删除服务器。因此,Kafka分区必须相应增加或减少

  1. Kafka允许增加分区。这种情况下,如何动态减少Kafka分区呢?
  2. 还有其他解决方案来处理这种自动缩放吗?

apache-kafka kafka-consumer-api

0
推荐指数
1
解决办法
3468
查看次数