标签: kafka-topic

Kafka Streams 测试:java.util.NoSuchElementException:未初始化的主题:“output_topic_name”

我已经按照https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为 kafka 流应用程序编写了一个测试类 ,其代码是

import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

public class KafkaStreamsConfigTest {

private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;

private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();

private String key="test";
private Object value = "some value";
private Object expected_value = "real value";

String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";

String applicationId = "my-app";
String …
Run Code Online (Sandbox Code Playgroud)

unit-testing apache-kafka apache-kafka-streams kafka-topic spring-kafka-test

6
推荐指数
1
解决办法
1458
查看次数

压缩后的Kafka主题可以用作键值数据库吗?

在许多文章中,我读到压缩的 Kafka 主题可以用作数据库。但是,在查看 Kafka API 时,我找不到允许我根据键查询主题的值的方法。

那么,压缩后的Kafka主题可以用作(高性能、只读)键值数据库吗?

在我的架构中,我想为组件提供一个紧凑的主题。我想知道该组件是否需要在其本地数据库中拥有该主题的副本,或者是否可以使用该压缩主题作为键值数据库。

apache-kafka kafka-topic

6
推荐指数
1
解决办法
3409
查看次数

Kafka 主题级配置更改何时生效?

有如下文件所述,可以在 Kafka 集群仍在运行时进行 Kafka 主题级别的配置更改。

我的问题是:

apache-kafka kafka-topic

5
推荐指数
0
解决办法
206
查看次数

kafka 主题分区的最大复制因子是多少

我的 kafka 集群有 3 个代理和几个主题,每个主题有 5 个分区。现在我想为分区设置复制因子。

我可以为 kafka 主题的分区设置的最大复制因子是多少?

replication partitioning apache-kafka kafka-topic

5
推荐指数
1
解决办法
2844
查看次数

Kafka regex-topics/听多个主题的实际限制是什么

I am exploring different PubSub platforms and I was wondering what the limits are in Kafka for listening to multiple topics. Consider for instance this Use Case. We have trains, station entry gates, devices that all publish their telemetry. Currently this is done on a MQ but as data rates increase, smart trains etc. we need to move to a new PubSub/streaming platform and Kafka is on that list of course.

As I see it there are two strategies for …

publish-subscribe apache-kafka telemetry kafka-topic

5
推荐指数
1
解决办法
168
查看次数

Apache Kafka 中的分区领导者是什么?

kafka 领导者是自己分区还是经纪人?我最初的理解是,它们是充当读/写代理的分区,然后将它们的价值交给 ISR。

但是最近我听到他们提到他们好像发生在“经纪人”级别,因此我很困惑。

我知道还有其他帖子旨在回答这个问题,但那里的答案没有帮助。

leader broker apache-kafka kafka-topic kafka-partition

5
推荐指数
3
解决办法
7739
查看次数

Kafka Log Compacted Topic Duplication Values 未删除针对同一键

日志压缩主题不应该针对相同的键保留重复。但在我们的例子中,当发送具有相同键的新值时,不会删除前一个值。可能是什么问题?

val TestCompactState: KTable[String, TestCompact] = builder.table[String, TestCompact](kafkaStreamConfigs.getString("testcompact-source"),
   (TestCompactmaterialized).withKeySerde(stringSerde).withValueSerde(TestCompactSerde)) 

Run Code Online (Sandbox Code Playgroud)

我得到的 实际结果

Offsets      Keys        Messages
5            {"id":5}   {"id":5,"namee":"omer","__deleted":"false"}
6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}
Run Code Online (Sandbox Code Playgroud)

我只想要针对相同关键预期结果的最新记录

6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api apache-kafka-streams apache-kafka-connect kafka-topic

5
推荐指数
2
解决办法
758
查看次数

Python-Kafka:无限轮询主题

我正在使用 python-kafka 来收听 kafka 主题并使用该记录。我想让它无限轮询而不退出。这是我的代码如下:

def test():
    consumer = KafkaConsumer('abc', 'localhost:9092', auto_offset_reset='earliest')
    for msg in consumer:
        print(msg.value)
Run Code Online (Sandbox Code Playgroud)

这段代码只是读取数据,直接退出。有没有办法即使没有推送消息也可以继续收听主题?

任何持续监控该主题的相关示例对我来说也很棒。

python apache-kafka kafka-consumer-api kafka-python kafka-topic

5
推荐指数
1
解决办法
8136
查看次数

在Java中哪里设置参数min.insync.replicas和ack?

我需要设置两个参数min.insync.replicasacks。官方文档说该参数min.insync.replicas是broker的参数。我是否正确理解,对于所有主题,都应该在 server.properties 文件中指定它?其中之一是使用命令 kafka.config.sh。Acks参数只能在配置生产者时设置,例如从应用程序?更改文件 Producer.properties 没有帮助吗?

java broker apache-kafka kafka-topic

4
推荐指数
1
解决办法
4066
查看次数

如何在 Python 中以编程方式获取每个 Kafka 主题分区的最新偏移量

我是 Kafka 新手,想要获取每个分区的 Kafka 主题的位置。我在文档中看到 - https://kafka- python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html#kafkaadminclient - 偏移量可以通过函数获得KafkaAdminClient.list_consumer_group_offsets,但我没有看到这样的方法那里的位置。

有人知道我怎样才能得到它吗?

python apache-kafka kafka-topic

4
推荐指数
1
解决办法
4401
查看次数