小编fox*_*gen的帖子

KafkaStreams - InconsistentGroupProtocolException

我有一个Kafka Streams应用程序,它使用Kafka Streams DSL连接到我们的Kafka集群,如下所示:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);

// do work

kStreams = new KafkaStreams(builder, config);
kStreams.start();
Run Code Online (Sandbox Code Playgroud)

我的代码库的另一部分是直接使用客户端客户端与我们的集群建立连接.

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();
Run Code Online (Sandbox Code Playgroud)

我这样做的原因是在有条件地启动应用程序的其他部分(包括Kafka Streams拓扑)之前收集有关使用者组的元数据.可能还有其他方法可以做到这一点(例如通过各种钩子或不是什么),但我更好奇为什么这些方法的混合有时会(间歇地)导致InconsistentGroupProtocolException被抛出.

请问有人可以解释为什么会被扔掉?我很难确定源代码本身到底发生了什么,但我想Kafka Streams构建的底层消费者正在指定与KafkaConsumer客户端不同的分区协议.无论如何,我们将非常感谢您对理解此异常的任何帮助

apache-kafka apache-kafka-streams

5
推荐指数
1
解决办法
1361
查看次数

Kafka Streams - 更新KTable上的聚合

我有一个KTable,其数据看起来像这样(key => value),其中key是客户ID,值是包含一些客户数据的小JSON对象:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
Run Code Online (Sandbox Code Playgroud)

我想对这个KTable进行一些聚合,并且基本上保持每个记录的数量age_group.所需的KTable数据如下所示:

"18-24" => 3
"25-30" => 1
Run Code Online (Sandbox Code Playgroud)

让我们说Alice,谁在18-24上面的小组中,有一个生日,让她进入新的年龄组.支持第一个KTable的状态存储现在应该如下所示:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

5
推荐指数
1
解决办法
4177
查看次数

Kafka Streams-是否可以在没有本地Kafka Streams实例的情况下运行远程交互式查询

我有一些我想远程查询的Kafka Streams应用程序实例。

当前,所有实例都在指定的host:port对上进行侦听,并且每个实例都能够查询其自己的本地状态存储并通过REST服务传达这些值。

+------------------+  +------------------+  +------------------+
|                  |  |                  |  |                  |
|  instance1:9191  |  |  instance2:9292  |  |  instance3:9393  |
|                  |  |                  |  |                  |
+------------------+  +------------------+  +------------------+
Run Code Online (Sandbox Code Playgroud)

我希望有一个单独的应用程序能够查询这些实例中的状态存储:

             consumer group                         separate application
+---------------------------------------+              _____
|   [instance1] [instance2] [instance3] |  <~-------  | app |
+---------------------------------------+              -----
Run Code Online (Sandbox Code Playgroud)

单独的应用程序将利用相同的逻辑StreamsMetadataState::getAllMetadataForStore来获取我的应用程序正在运行的实例的所有活动主机/端口对,通过REST服务运行远程查询,并将数据汇总到其自身的应用程序逻辑中。

但是,我很难实现这一点。由于主机/端口对似乎是通过使用者组协议进行通信的,因此看起来我需要实际实例化另一个Kafka Streams实例(即使用者组的另一个成员),以便利用远程交互式查询。

我的问题是:

  • 是否可以找到应用程序的所有正在运行实例的主机/值对,而无需在同一使用者组中也运行本地Kafka Streams实例?(我强调运行,因为我不介意仅为了获取主机/端口元数据而实例化Kafka Streams应用程序的虚拟实例,但是有一项validateRunning检查阻止了我这样做)
  • 以上设计是否存在问题(运行单独的应用程序以查询Kafka Streams应用程序的所有实例)?即也许我所谈论的行为不被支持,因为我在做的事情有我尚未考虑的后果?

似乎应该有一个获取状态存储元数据的静态方法,该方法将允许我们直接传递从构建器对象中提取的所有值。即

KafkaStreams::getMetaDataForStore(streamsConfig, storeName);
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

5
推荐指数
1
解决办法
662
查看次数

Cloud Spanner - WHERE 子句中大量项目的读取性能

我正在评估一个项目的一些不同的数据存储,并且我有一个奇怪但不灵活的要求来检查每个查询是否存在 1500 个键...基本上我要运行的唯一查询的形式是:

SELECT user_id, name, gender
WHERE user_id in (user1, user2, ..., user1500)
Run Code Online (Sandbox Code Playgroud)

表中大约有 35 亿行。Spanner 是一个引起我注意的数据存储。我想知道以这种方式查询数据是否可行,或者是否会由于子句中的项目数量过多而遇到性能问题WHERE。到目前为止,我只能在少量数据上测试这些查询,因此我更倾向于理论性能影响,而不是仅仅“尝试并发现”。

另外,是否有其他数据存储可能更适合这种读取模式?我预计每秒运行的查询不超过 80 个。此外,数据将每周批量加载。数据本质上是结构化的,但我们不以关系方式使用它(即没有连接)。

无论如何,如果这个问题有任何含糊之处,我们很抱歉。如果需要,我很乐意提供更多详细信息。

google-cloud-platform google-cloud-spanner

2
推荐指数
1
解决办法
919
查看次数

Kafka Streams - 减少大型国营商店的内存占用

我有一个拓扑(见下文),它读取了一个非常大的主题(每天超过十亿条消息).这个Kafka Streams应用程序的内存使用率非常高,我正在寻找一些关于如何减少州商店足迹的建议(详情如下).注意:我并不是想把山羊放到国营商店,我只是觉得可能有办法改善我的拓扑结构 - 见下文.

// stream receives 1 billion+ messages per day
stream
    .flatMap((key, msg) -> rekeyMessages(msg))
    .groupBy((key, value) -> key)
    .reduce(new MyReducer(), MY_REDUCED_STORE)
    .toStream()
    .to(OUTPUT_TOPIC);

// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);


// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)

// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)

// etc
Run Code Online (Sandbox Code Playgroud)

更具体地说,我想知道OUTPUT_TOPIC作为KTable 流式传输是否导致状态存储(REKEYED_STORE)大于它需要在本地存储.对于具有大量唯一键的changelog主题,将它们作为a KStream和窗口化聚合进行流式传输会更好吗?或者这不会像我想的那样减少占用空间(例如,只有一部分记录 - 窗口中的那些记录将存在于本地状态存储中).

无论如何,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效.这是我的问题:

  • 是否有任何配置选项,一般策略等应该考虑具有此级别吞吐量的Kafka Streams应用程序?
  • 单个实例的内存密集程度应该有什么指导原则吗?即使您有一些武断的指南,与他人分享可能会有所帮助.我的一个实例目前正在使用15GB的内存 - 我不知道这是好/坏/无关紧要.

任何帮助将不胜感激!

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
2134
查看次数