我一直在尝试使用Helm 图表来部署 Kafka 。所以我为 Kafka Pod 定义了 NodePort 服务。我使用相同的主机和端口检查了控制台 Kafka 生产者和消费者 - 它们工作正常。但是,当我创建 Spark 应用程序作为数据消费者并创建 Kafka 作为生产者时,它们无法连接到 Kafka service0。我使用 minikube ip (而不是节点 ip)作为主机和服务 NodePort 端口。尽管在 Spark 日志中,我看到 NodePort 服务解析端点,并且代理被发现为 pod 寻址和端口:
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Discovered group coordinator 172.17.0.20:9092 (id: 2147483645 rack: null)
INFO ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Revoking previously assigned partitions []
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] (Re-)joining group
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2147483645 (/172.17.0.20:9092) could not be established. Broker may not …
Run Code Online (Sandbox Code Playgroud) 我对 kafka 文档中关于这个主题的措辞有点困惑,所以我想在这里问我是否正确地解释了这些内容?
因此,如果我正确理解这种扩展 Kafka Stream 应用程序的唯一方法是启动应用程序的新实例(或增加 application 中的流线程数量),这将确保 ConsumerGroup('application. id'),这样我就可以将流应用程序扩展到主题的分区数量(如果我的流拓扑连接到多个主题,实际上会发生什么,假设 TopicA 有 5 个分区,topicB 有 3 个分区,我加入了 TopicA 和 TopicB 的流,我猜在这种情况下我可以扩展到 3 个实例/线程)。
现在假设我有一个包含 5 个分区的 topicA,并且启动了应用程序的 3 个实例,如果我在拓扑中配置了 KTable,则每个 KTable 将包含来自特定分区的信息,并且我必须找出我的哪个实例(分区)上的元数据关键是,那么当我启动第四个实例时会发生什么,假设实例3上的KTable的键/值现在可以转到实例4上的KTable,不是吗?一方面问题是这样的重新平衡需要多长时间(我认为这取决于主题大小,所以假设需要 1 分钟,我正在查询 KTable 的应用程序在此操作期间会没有响应吗?)
附带问题是,此机制对于“streamBuilder.table(..)”和“streambuilder.groupByKey(..).reduce(..)”的工作原理是否完全相同?
最后一个问题,同样是一个具有 5 个分区的主题,但我没有启动 3 个应用程序实例,而是启动了一个具有 3 个流线程的实例 (num.stream.threads = 3),我会再次拥有 3 个 KTable 代表 5 个分区吗?如果我将线程大小从 3 更改为 4,其行为与增加实例数完全相同。
感谢您的回答..
我无法在此处以及 Spring 网站和博客上找到 Spring Cloud Stream 是否能够提供 Kafka Stream API 提供的“Exactly Once”语义。也许没有单个配置/注释,并且在线程“是否可以使用 Spring Cloud Stream 进行一次处理? ”我可以找到一些有用的东西,但专家的答案是非常高的水平。感谢帮助
我正在致力于在 Prod 中扩展 kafka 集群。Confluence 提供了添加 kafka 代理的简单方法。但是,我如何知道如何与 Kafka 一起扩展 Zookeeper。比例应该是多少?现在我们有 5 个 Zookeeper 节点用于 5 个 kafka 代理。如果我有 10 个 kafka 代理,应该有多少个 Zookeeper 节点?
我有一些列表想通过卡夫卡生产者发送。
listA [1,2,3]
listB ["cat", "dog", "fish"]
Run Code Online (Sandbox Code Playgroud)
生产者以字节形式发送消息,因此我不确定如何正确设置消息,以便在需要引号来发送消息时发送列表。这就是我目前所拥有的。
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for _ in range(1):
print(producer.send('test', b'"worker_id": listA , "worker_name" : listB'))
Run Code Online (Sandbox Code Playgroud)
这个方法只是给我一个语法错误。我也尝试过下面的方法,得到了类似的结果
print(producer.send('test', b("worker_uuid": worker_uuid))
Run Code Online (Sandbox Code Playgroud) 任何人都可以解释并指导我链接或资源来阅读有关卡夫卡消费者在以下场景中如何工作的信息。
一个消费者组有5个消费者,主题有3个分区(kafka如何决定)
一个Consumer group有5个consumer,topic有10个partition(kafka如何分担负载)
两个消费者组,每个消费者组有 1 个消费者,kafka 集群有 2 个服务器,其中一个主题在节点 1 和节点 2 之间进行分区,当来自不同组的消费者订阅一个分区时,如何避免重复。
上述可能不是配置 kafka 时的最佳实践,但我需要知道它是如何处理的。
提前致谢。
我正在尝试实现一种通过使用 KafkaConsumer.assign(partition)、KafkaConsumer.seek(partition, offset) 随机访问来自 Kafka 的消息的方法。然后读取一条消息的轮询。
但在这种情况下我每秒无法收到超过 500 条消息。相比之下,如果我“订阅”该分区,我每秒会收到 100,000+ 条消息。(@1000 字节消息大小)
我试过了:
在所有情况下,我得到的最小值约为 200 条消息/秒。如果我使用 2-3 个线程,则最大值为 500。但上面的操作使得“.poll()”调用花费的时间越来越长(从单线程的 3-4 毫秒到 10 个线程的 40-50 毫秒)。
我天真的 kafka 理解是消费者打开与代理的连接并发送请求以检索其日志的一小部分。虽然所有这些都会涉及一些延迟,并且检索一批消息会好得多 - 我想它会随着涉及的接收器数量的增加而扩展,但代价是增加运行消费者的虚拟机和运行代理的虚拟机。但两人都在闲置。
显然,代理端发生了一些同步,但我无法弄清楚这是否是由于我使用 Kafka 或使用 .seek 的一些固有限制所致
我希望得到一些关于我是否应该尝试其他方法的提示,或者这就是我所能得到的。
我有一个 kakfa 生产者,我不需要序列化键,只需序列化值。但生产者配置需要“key.serializer”设置。
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Run Code Online (Sandbox Code Playgroud)
当配置了实际上未使用的随机设置时,我发现它很令人困惑。
有没有办法不设置“key.serializer”设置来表示没有密钥序列化?
我正在使用 websocket 协议和 STOMP 作为消息协议为网络平台开发聊天模块。
这是我第一次使用任何消息代理,Kafka 是在公司(我正在工作的)网络平台上使用的消息代理,我猜是用于其他模块。之前我刚开始时使用的是 RabbitMQ,现在我必须切换到 Kafka。我在RabbitMQ的网站上看到有一整篇关于如何使用STOMP的文章,但是Kafka的官方网站上没有这样的东西。
但我探索了其他几个来源、许多教程,但找不到任何与 Kafka 一起使用 STOMP 协议相关的内容,这让我问这是否可能?
这是我的 websocket 配置类:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
//Here's the line I wrote to use Kafka as a MB, but doesn't work
registry.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(9092);
Run Code Online (Sandbox Code Playgroud)
启动 Kafka,然后运行我的 Java Spring 应用程序后,我从 java.io.IOException 中收到“连接由对等方重置”,如果一切正常,则不应抛出该异常。
我使用 Kakfa 的 2.2.0 版本、Zookeeper 的 3.4.14 版本,并使用 STS 3 作为我的 IDE。
任何帮助,将不胜感激。
我有使用 Kafka Streams 处理的记录(使用处理器 API)。假设该记录有city_id
和一些其他字段。
在 Kafka Streams 应用程序中,我想将目标城市的当前温度添加到记录中。
Temperature<->City
对存储在例如中。Postgres。
在 Java 应用程序中,我可以使用 JDBC 连接到 Postgres 并进行构建,new HashMap<CityId, Temperature>
这样我就可以根据city_id
. 就像是tempHM.get(record.city_id)
。
有几个问题如何最好地处理它:
最初,我一直在内部执行此操作,AbstractProcessor::init()
但这似乎是错误的,因为它是为每个线程初始化的,并且还在重新平衡时重新初始化。
因此,我在使用它构建流拓扑构建器和处理器之前移动了它。在所有处理器实例上仅独立提取一次数据。
这是正确有效的方法吗?它有效,但是...
HashMap<CityId, Temperature> tempHM = new HashMap<CityId, Temperature>;
// Connect to DB and initialize tempHM here
Topology topology = new Topology();
topology
.addSource(SOURCE, stringDerializer, protoDeserializer, "topic-in")
.addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(tempHm), SOURCE)
.addSink(SINK, "topic-out", stringSerializer, protoSerializer, TemperatureAppender.NAME)
;
Run Code Online (Sandbox Code Playgroud)
例如,我想每 15 分钟刷新一次温度数据。我正在考虑使用 Hashmap 容器而不是 Hashmap,这样可以处理它: …
apache-kafka ×10
java ×2
apache-spark ×1
kubernetes ×1
list ×1
spring-kafka ×1
stomp ×1
websocket ×1