是否可以合并kafka中的记录并将输出发布到不同的流?
例如,有一个针对 kafka 主题的事件流,如下所示
{txnId:1,startTime:0900},{txnId:1,endTime:0905},{txnId:2,endTime:0912},{txnId:3,endTime:0930},{txnId:2,startTime:0912}, {txnId:3,开始时间:0925}......
我想通过 txnId 合并这些事件并创建合并的输出,如下所示
{txnId:1,startTime:0900,endTime:0905},{txnId:2,startTime:0910,endTime:0912},{txnId:3,startTime:0925,endTime:0930}
请注意,传入事件中不会维护顺序。因此,如果在开始时间事件之前收到 txn Id 的 endTime,那么我们需要等到收到该 txnId 的开始时间事件后再启动合并
我浏览了 Kafka Streams 示例附带的字数示例,但不清楚如何等待事件,然后在进行转换时合并。
任何想法都受到高度赞赏。
apache-kafka kafka-consumer-api apache-kafka-streams confluent-platform
我有一个 Kafka 消费者组,消费多个主题(每个主题有多个分区)。所有主题的每个分区上都包含大量记录。我目前正在尝试理解消费者最初开始消费时的行为。特别是,我想知道经纪人如何决定哪些记录首先到达客户。
以下几个方面值得注意:
max.poll.records配置更多的记录)我在消费者配置参数中找不到任何允许我更改此行为的内容。这并不是真正的问题,因为所有消息最终都会被读取。但我想更详细地了解这种行为。
所以我的问题是:代理如何决定哪些记录最终出现在消费者轮询循环的结果中?
我正在使用 @RetryableTopic 在 kafka 消费者中实现重试逻辑。我给出的配置如下:
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 300000, multiplier = 10.0),
autoCreateTopics = "false",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
Run Code Online (Sandbox Code Playgroud)
然而,它不是重试 4 次,而是无限重试,而且也没有延迟时间。有人可以帮我解决代码吗?我希望该消息重试 4 次,第一次延迟 - 5 分钟后,然后第二次延迟 10 分钟后,第三次延迟 20 分钟后......
代码如下:
int i = 1;
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 300000, multiplier = 10.0),
autoCreateTopics = "false",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "topic_string_data", containerFactory = "default")
public void consume(@Payload String message , @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
String prachi = null; …Run Code Online (Sandbox Code Playgroud) 我需要开始使用卡夫卡。我很难弄清楚消费者应该收到什么:根据我的理解,我们可以通过多种方式配置消费者:
示例1:
@KafkaListener(topics = "topic_name)
public void receiveSimpleString(@Payload String message) {
}
Run Code Online (Sandbox Code Playgroud)
示例2:
@KafkaListener(topics = "topic_name)
public void receiveConsumerRecord(@Payload ConsumerRecord<String, String> message) {
}
Run Code Online (Sandbox Code Playgroud)
示例3:
@KafkaListener(topics = "topic_name)
public void receiveObject(@Payload SomeCustomClass message) {
}
Run Code Online (Sandbox Code Playgroud)
示例4:
@KafkaListener(topics = "topic_name)
public void receiveSpringMessage(@Payload org.springframework.messaging.Message<T> message) {
}
Run Code Online (Sandbox Code Playgroud)
也许还有更多的方法,但那些曾经是我在研究 kafka+spring 时最常看到的。
现在的问题是:
是否有关于消费者应该收到什么的最佳实践?不同的例子有优点/缺点吗?
java apache-kafka spring-boot kafka-consumer-api spring-kafka
当主题不存在时,我不想从我的消费者应用程序自动创建主题。
我知道这是一个 Kafka 服务器级别的配置,用于禁用自动主题创建 ( auto.create.topics.enable = false),但我无法在我的基础设施中进行此更改。
因此,我正在寻找一种方法来禁用我的消费者应用程序中的自动主题创建(使用 Spring Kafka)。
我尝试设置
spring:
kafka:
consumer:
properties:
allow.auto.create.topics: false
Run Code Online (Sandbox Code Playgroud)
但它不起作用!
似乎 Kafka 添加了此支持: https://cwiki.apache.org/confluence/display/KAFKA/KIP-361%3A+Add+Consumer+Configuration+to+Disable+Auto+Topic+Creation
有人可以帮忙吗?
我是Kafka的新手,并且使用新的KafkaProducer和KafkaConsumer,版本:0.9.0.1
java中是否有任何方法可以在创建特定主题后更改/更新特定主题的分区数.
我没有使用zookeeper来创建主题.我的KafkaProducer会在发布请求到达时自动创建主题.
如果这些还不够,我还可以提供更多细节
我有一个Spark流传输过程,该过程将从kafka读取数据到DStream中。
在我的管道中,我做了两次(一个接一个):
DStream.foreachRDD(在RDD上转换并插入到目标中)。
(每次我进行不同的处理并将数据插入到不同的目的地)。
当我从Kafka读取数据后,我想知道DStream.cache是如何工作的吗?有可能做到吗?
现在该过程实际上是从Kafka读取数据两次吗?
请记住,不可能将两个foreachRDD放在一个中(因为两个路径完全不同,所以那里有状态转换-需要在DStream上应用...)
谢谢你的帮助
我正在使用Kafka 0.9.1新的消费者API。使用者被手动分配给分区。对于这个消费者,我希望看到它的进展(意味着滞后)。由于我将组ID消费者教程添加为属性,因此我假设可以使用以下命令
bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial --bootstrap-server localhost:9092
不幸的是,使用上述命令未显示我的消费者组详细信息。因此,我无法监视我的消费者的进度(这是滞后的)。在上述场景(手动分配的分区)中,如何监控延迟?
代码是:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "my-topic";
TopicPartition topicPartition = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(topicPartition));
consumer.seekToBeginning(topicPartition);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
consumer.commitSynch();
}
} finally {
consumer.close();
}
Run Code Online (Sandbox Code Playgroud) 我是Kakfa Spring集成的新手.我已经实现了Kafka消息发送和One Listener,它对我来说很好.但是我希望在两个地方听到同样的信息.谁能帮我.以下是我的代码.
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.group-id=sample-group
spring.kafka.producer.batch-size= 16384
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.retries= 0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.template.default-topic=spring-topic
Run Code Online (Sandbox Code Playgroud)
发件人代码:
public void testSimple() {
System.out.println("Sending Topic Here");
template.send("spring-topic", 0, "foo");
template.flush();
}
Run Code Online (Sandbox Code Playgroud)
接收器:
@KafkaListener(id = "foo", topics = "spring-topic")
public void listen1(String foo) {
System.out.println("Got Notification Here");
this.latch1.countDown();
}
Run Code Online (Sandbox Code Playgroud)
可以帮助我如何在不同的地方阅读相同的消息.
apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
我已经为Kafka中的主题设置了TTL为7天,我从Kafka数据库中获取数据并将其存储在数据库中,但是从过去5天开始我的数据库服务器已关闭,现在我必须从过去5天获取消息Kafka并将其存储在数据库中注意:从过去5天开始没有问题Kafka.