标签: kafka-consumer-api

合并kafka流中的记录

是否可以合并kafka中的记录并将输出发布到不同的流?

例如,有一个针对 kafka 主题的事件流,如下所示

{txnId:1,startTime:0900},{txnId:1,endTime:0905},{txnId:2,endTime:0912},{txnId:3,endTime:0930},{txnId:2,startTime:0912}, {txnId:3,开始时间:0925}......

我想通过 txnId 合并这些事件并创建合并的输出,如下所示

{txnId:1,startTime:0900,endTime:0905},{txnId:2,startTime:0910,endTime:0912},{txnId:3,startTime:0925,endTime:0930}

请注意,传入事件中不会维护顺序。因此,如果在开始时间事件之前收到 txn Id 的 endTime,那么我们需要等到收到该 txnId 的开始时间事件后再启动合并

我浏览了 Kafka Streams 示例附带的字数示例,但不清楚如何等待事件,然后在进行转换时合并。

任何想法都受到高度赞赏。

apache-kafka kafka-consumer-api apache-kafka-streams confluent-platform

2
推荐指数
1
解决办法
2852
查看次数

当剩余的记录多于“max.poll.records”时,Kafka 如何决定消费者轮询循环中包含哪些记录?

我有一个 Kafka 消费者组,消费多个主题(每个主题有多个分区)。所有主题的每个分区上都包含大量记录。我目前正在尝试理解消费者最初开始消费时的行为。特别是,我想知道经纪人如何决定哪些记录首先到达客户。

以下几个方面值得注意:

  • 记录的数量比消费者在一次往返中可以处理的记录多得多(即比消费者的max.poll.records配置更多的记录)
  • 消费者必须读取来自多个主题和多个分区的记录
  • 我天真地假设代理在每个轮询循环中返回每个主题的记录,以便消费者以相似的速度读取所有主题。但情况似乎并非如此。显然,它一次优先考虑单个主题的记录,在没有明显模式的情况下切换主题(至少这是我在消费者的指标中看到的)。

我在消费者配置参数中找不到任何允许我更改此行为的内容。这并不是真正的问题,因为所有消息最终都会被读取。但我想更详细地了解这种行为。

所以我的问题是:代理如何决定哪些记录最终出现在消费者轮询循环的结果中?

apache-kafka kafka-consumer-api

2
推荐指数
1
解决办法
1170
查看次数

spring kafka消费者中的无限重试@retryabletopic

我正在使用 @RetryableTopic 在 kafka 消费者中实现重试逻辑。我给出的配置如下:

@RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = 300000, multiplier = 10.0),
            autoCreateTopics = "false",
            topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
    )
Run Code Online (Sandbox Code Playgroud)

然而,它不是重试 4 次,而是无限重试,而且也没有延迟时间。有人可以帮我解决代码吗?我希望该消息重试 4 次,第一次延迟 - 5 分钟后,然后第二次延迟 10 分钟后,第三次延迟 20 分钟后......

代码如下:

int i = 1;

@RetryableTopic(
        attempts = "4",
        backoff = @Backoff(delay = 300000, multiplier = 10.0),
        autoCreateTopics = "false",
        topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "topic_string_data", containerFactory = "default")
public void consume(@Payload String message , @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    String prachi = null; …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-retry kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
1万
查看次数

Springs Kafka Consumer最佳实践:消费者应该收到什么样的消息

我需要开始使用卡夫卡。我很难弄清楚消费者应该收到什么:根据我的理解,我们可以通过多种方式配置消费者:

示例1:

@KafkaListener(topics = "topic_name)
public void receiveSimpleString(@Payload String message) {
}
Run Code Online (Sandbox Code Playgroud)

示例2:

@KafkaListener(topics = "topic_name)
public void receiveConsumerRecord(@Payload ConsumerRecord<String, String> message) {
}
Run Code Online (Sandbox Code Playgroud)

示例3:

@KafkaListener(topics = "topic_name)
public void receiveObject(@Payload SomeCustomClass message) {
}
Run Code Online (Sandbox Code Playgroud)

示例4:

@KafkaListener(topics = "topic_name)
public void receiveSpringMessage(@Payload org.springframework.messaging.Message<T> message) {
}
Run Code Online (Sandbox Code Playgroud)

也许还有更多的方法,但那些曾经是我在研究 kafka+spring 时最常看到的。

现在的问题是:

是否有关于消费者应该收到什么的最佳实践?不同的例子有优点/缺点吗?

java apache-kafka spring-boot kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
942
查看次数

从 Spring Kafka Consumer 禁用自动主题创建

当主题不存在时,我不想从我的消费者应用程序自动创建主题。

我知道这是一个 Kafka 服务器级别的配置,用于禁用自动主题创建 ( auto.create.topics.enable = false),但我无法在我的基础设施中进行此更改。

因此,我正在寻找一种方法来禁用我的消费者应用程序中的自动主题创建(使用 Spring Kafka)。

我尝试设置

spring:
  kafka:
    consumer:
      properties:
        allow.auto.create.topics: false
Run Code Online (Sandbox Code Playgroud)

但它不起作用!

似乎 Kafka 添加了此支持: https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+Configuration+to+Disable+Auto+Topic+Creation

有人可以帮忙吗?

java apache-kafka kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
6263
查看次数

Kafka:使用java更改特定主题的分区数

我是Kafka的新手,并且使用新的KafkaProducer和KafkaConsumer,版本:0.9.0.1

java中是否有任何方法可以在创建特定主题后更改/更新特定主题的分区数.

我没有使用zookeeper来创建主题.我的KafkaProducer会在发布请求到达时自动创建主题.

如果这些还不够,我还可以提供更多细节

java apache-kafka kafka-consumer-api kafka-producer-api

1
推荐指数
1
解决办法
3317
查看次数

在Spark Streaming中缓存DStream

我有一个Spark流传输过程,该过程将从kafka读取数据到DStream中。

在我的管道中,我做了两次(一个接一个):

DStream.foreachRDD(在RDD上转换并插入到目标中)。

(每次我进行不同的处理并将数据插入到不同的目的地)。

当我从Kafka读取数据后,我想知道DStream.cache是​​如何工作的吗?有可能做到吗?

现在该过程实际上是从Kafka读取数据两次吗?

请记住,不可能将两个foreachRDD放在一个中(因为两个路径完全不同,所以那里有状态转换-需要在DStream上应用...)

谢谢你的帮助

apache-spark spark-streaming kafka-consumer-api

1
推荐指数
1
解决办法
3426
查看次数

Kafka:监视分配给分区主题的使用者的延迟

我正在使用Kafka 0.9.1新的消费者API。使用者被手动分配给分区。对于这个消费者,我希望看到它的进展(意味着滞后)。由于我将组ID消费者教程添加为属性,因此我假设可以使用以下命令

bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial --bootstrap-server localhost:9092

(如此处http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client所述

不幸的是,使用上述命令未显示我的消费者组详细信息。因此,我无法监视我的消费者的进度(这是滞后的)。在上述场景(手动分配的分区)中,如何监控延迟?

代码是:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);


        String topic = "my-topic";
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        consumer.assign(Arrays.asList(topicPartition));
        consumer.seekToBeginning(topicPartition);
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
     System.out.println(record.offset() + ": " + record.value());
  consumer.commitSynch();
  }
} finally {
  consumer.close();
}
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api

1
推荐指数
1
解决办法
6370
查看次数

如何在多个消费者处读取相同的kafka消息

我是Kakfa Spring集成的新手.我已经实现了Kafka消息发送和One Listener,它对我来说很好.但是我希望在两个地方听到同样的信息.谁能帮我.以下是我的代码.

 spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.group-id=sample-group


spring.kafka.producer.batch-size= 16384
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.retries= 0
spring.kafka.producer.buffer-memory=33554432

spring.kafka.template.default-topic=spring-topic
Run Code Online (Sandbox Code Playgroud)

发件人代码:

public void testSimple() {
        System.out.println("Sending Topic Here");
        template.send("spring-topic", 0, "foo");
        template.flush();
    }
Run Code Online (Sandbox Code Playgroud)

接收器:

@KafkaListener(id = "foo", topics = "spring-topic")
    public void listen1(String foo) {
        System.out.println("Got Notification Here");
        this.latch1.countDown();
    }
Run Code Online (Sandbox Code Playgroud)

可以帮助我如何在不同的地方阅读相同的消息.

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

1
推荐指数
1
解决办法
1087
查看次数

如何使用Java从Kafka获取最近5天的消息

我已经为Kafka中的主题设置了TTL为7天,我从Kafka数据库中获取数据并将其存储在数据库中,但是从过去5天开始我的数据库服务器已关闭,现在我必须从过去5天获取消息Kafka并将其存储在数据库中注意:从过去5天开始没有问题Kafka.

java apache-kafka kafka-consumer-api

1
推荐指数
1
解决办法
1608
查看次数