标签: kafka-consumer-api

Kafka 流过滤:经纪人还是消费者?

我正在研究 kafka 流。我想使用选择性非常低的过滤器(几千分之一)过滤我的流。我在看这个方法:https : //kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/KStream.html#filter(org.apache.kafka.streams.kstream.Predicate)

但是我找不到任何证据,如果过滤器将由消费者评估(我真的不想将大量 GB 转移给消费者,只是为了扔掉它们),或者在经纪人内部(耶!)。

如果它在消费者方面进行评估,有什么办法,如何在经纪人中做到这一点?

谢谢!

apache-kafka kafka-consumer-api apache-kafka-streams

2
推荐指数
1
解决办法
5644
查看次数

Kafka分区滞后增加

我有一个使用 Kafka 1.0 作为队列的应用程序。Kafka 主题有 80 个分区和 80 个正在运行的消费者。(Kafka-python 消费者)。

通过运行命令:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup  --describe 
Run Code Online (Sandbox Code Playgroud)

我看到其中一个分区卡在一个偏移处,并且随着新记录的添加,延迟不断增加。

上述命令的输出如下所示:

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST

118 mytopic                       37         1924            2782            858        kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost
119 mytopic                       38         2741            2742            1          kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost
120 mytopic                       39         2713            2713            0          kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost
121 mytopic                       40         2687            2688            1          kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost
Run Code Online (Sandbox Code Playgroud)

这是什么原因造成的?此外,使用 reset-offsets 命令重置偏移也是不可取的,因为可能不会定期手动监控此服务器。

客户端在 Linux m/c 中作为并行进程在后台运行:

consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092',
                     session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1,
                     auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024,
                     value_deserializer=lambda m: json.loads(m.decode('ascii')))

for message in consumer:
    msg …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-consumer-api kafka-python

2
推荐指数
1
解决办法
1万
查看次数

休息控制器通过spring kafka返回kafka中的记录

对于我的演示应用程序,我必须创建一个 rest 控制器来返回 kafka 队列中的消息。我已经阅读了 spring-kafka 参考指南并实现了消费者配置并创建了如下的 bean

@Configuration
@EnableKafka
public class ConsumerConfiguration {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        // allows a pool of processes to divide the work of consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "trx");

        return props;
    }

    @Bean
    public ConsumerFactory<String, Transaction> transactionConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>( …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot kafka-consumer-api spring-web spring-kafka

2
推荐指数
1
解决办法
1234
查看次数

CURRENT-OFFSET中的破折号代表什么

参考以下消费者组描述的屏幕截图,我试图了解“-”在这里对于 CURRENT-OFFSET 意味着什么。它是否表示即使分区已分配给消费者,也不会从分区 1 和 3 消费消息。分区 1 和 3 的 LOG-END 偏移量分别为 281 和 277。

在此处输入图片说明

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
3538
查看次数

如何从字符串创建 Kafka 消费者记录以构建 Junit 测试用例

我需要一些帮助来为我的 Java kafka 使用者构建 Junit 测试用例。

我的原始源代码具有如下方法,并且需要为此创建一个单元测试用例。

public void processConsumerRecord(ConsumerRecords<String, GenericRecord> records, boolean isEventProcessed, boolean isOffsetCommitted,
                                  int totalErrorCountFromSinkService, int totalErrorCount, Consumer<String, GenericRecord> consumer) {
Run Code Online (Sandbox Code Playgroud)

...... }

我的 Kafka 消费者正在从 kafka 主题中提取消息,我需要能够以 ConsumerRecords 格式提供输入消息,但作为单元测试的一部分,我没有轮询来自 kafka 的消息,而是模拟来自原始 kafka 主题的消息并提供测试上述方法的单元测试用例的静态输入消息,如图所示。如何以 ConsumerRecords < String, GenericRecord > 的形式创建模拟输入消息?

java junit apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
4577
查看次数

我如何找到 kafka 消费者的费率?

有什么方法可以找到 kafka 消费者消费消息的速度吗?像 5000 条消息/秒

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
1658
查看次数

kafka-console-consumer 使用什么消费群

当我像这样运行 kafka-console-consumer 时

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
Run Code Online (Sandbox Code Playgroud)

它默认为哪个消费者组?如果我没有在命令行中指定消费者组或链接到消费者属性,它最终会使用随机消费者组吗?如何检查使用了哪个消费者组?

谢谢!

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
1043
查看次数

如何使用java获得kafka滞后

我目前开发了一个代码来显示主题、分区和日志偏移量。但我目前被困在如何获得分区的滞后上。我知道有一个 kafka offset 命令可以完成这个功能,但我需要的是一个 java 代码。

public static void main(String[] args) throws Exception {
    System.out.println("START CONSUMER");final Properties props = new Properties();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    // Create the consumer using props.
    final Consumer<Long, String> consumer =  new KafkaConsumer<>(props);

    // Subscribe to the topic.
    int i = 0;
    ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        TopicPartition partitiontemp = new TopicPartition(TOPIC, i);
        partitions.add(partitiontemp);
    }
    consumer.assign(partitions);
    consumer.seekToEnd(partitions);

    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        System.out.printf("Topic: %s …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
3388
查看次数

卡夫卡消费者不返回任何事件

下面的 Scala kafka 消费者没有从 poll调用中。

但是,主题是正确的,我可以看到使用控制台使用者将事件发送到主题:

/opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic --from-beginning
Run Code Online (Sandbox Code Playgroud)

当我使用调试器逐步完成并调用时,我还在下面的 Scala 代码示例中看到了该主题 kafkaConsumer.listTopics()

此外,这是从单个单元测试中调用的,所以我只创建了这个特征和消费者的一个实例(即另一个消费者实例不能消费消息)。我也在使用随机 group_id。

下面的代码/配置有什么问题吗?

import java.util.Properties

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.util.Random

trait KafkaTest {

  val kafkaConsumerProperties = new Properties()

  kafkaConsumerProperties.put("bootstrap.servers", "kafka:9092")

  kafkaConsumerProperties.put("group.id", Random.alphanumeric.take(10).mkString)

  kafkaConsumerProperties.put("key.deserializer", classOf[ByteArrayDeserializer])

  kafkaConsumerProperties.put("value.deserializer", classOf[StringDeserializer])

  val kafkaConsumer = new KafkaConsumer[String, String](kafkaConsumerProperties)

kafkaConsumer.subscribe(java.util.Collections.singletonList("my_topic"))

  def checkKafkaHasReceivedEvent(): Assertion = {

    val kafkaEvents = kafkaConsumer.poll(2000) // Always returns 0 events?
    ...
  }
}
Run Code Online (Sandbox Code Playgroud)

增加轮询超时也无济于事。

scala apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
1092
查看次数

Kafka 消费者因使用未知协议反序列化 Avro 消息时出错而失败

在我的用例中,我创建了 JDBC kafka 连接器,从 oracle 表中提取数据并成功推送到 kafka 主题,但是当我尝试读取来自这个 kafka 主题的消息时,我遇到了如下所示的反序列化问题。

org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2
Caused by: java.net.MalformedURLException: unknown protocol: localhost
        at java.net.URL.<init>(URL.java:593)
        at java.net.URL.<init>(URL.java:483)
        at java.net.URL.<init>(URL.java:432)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:124)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:330)
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api apache-kafka-connect

2
推荐指数
1
解决办法
4878
查看次数