小编war*_*iak的帖子

Kafka Streams:有一个缓慢的标点工作的负面后果是什么?

在我们的 Kafka Streams 拓扑中,我们有一些标点,可能需要很长时间才能运行(几分钟)。

如此缓慢的标点符号有什么后果?它们运行的​​进程会在 punctuate 运行时停止处理记录吗?它们会导致重新平衡吗?

apache-kafka apache-kafka-streams

5
推荐指数
1
解决办法
720
查看次数

Kafka GlobalKTable延迟问题

我有一个主题,该主题在商店中被读取为GlobalKTable并已实现。问题是,如果我更新了该主题的键,然后从存储中读取,则有一段时间(〜0.5秒),我得到了旧值。

这个问题可能是什么原因?

全局表是否将每个应用程序实例的数据存储在rocksDB中,因此,如果更新了另一个分区上的键,则需要花费一些时间从所有分区中提取数据并更新其本地rocksDB。如果不是,请说明globalktable存储如何在内部维护其状态?

上述问题如何解决?在应该期望一致性与mysql数据库相匹配的这种情况下,我们不应该使用globalktable吗?

apache-kafka apache-kafka-streams

3
推荐指数
1
解决办法
234
查看次数

KStream 将记录发送到多个流(不是分支)

有没有办法进行类似分支的操作​​,但将记录放在每个谓词评估为真的输出流中?Brach 将记录放置到第一个匹配项(文档:在第一个匹配项中将记录放置到一个且仅一个输出流中)。

apache-kafka apache-kafka-streams spring-kafka

2
推荐指数
1
解决办法
1242
查看次数

JSON 对象的 Kafka 流消费者:如何映射

我是 Kafka/Kafka Stream 的新手。我正在使用最新的Kafka/kafka-stream 和 kafka-client 以及openjdk11。我的生产者正在生产 json 对象(其中名称),看起来像

{"Name":"John", "amount":123, "time":2019-10-03T05:24:52" }
Run Code Online (Sandbox Code Playgroud)

生产者代码以便更好地理解:

public static ProducerRecord<String, String> newRandomTransaction(String name) {
    // creates an empty json {}
    ObjectNode transaction = JsonNodeFactory.instance.objectNode();

    Integer amount = ThreadLocalRandom.current().nextInt(0, 100);

    // Instant.now() is to get the current time
    Instant now = Instant.now();

    // we write the data to the json document
    transaction.put("name", name);
    transaction.put("amount", amount);
    transaction.put("time", now.toString());
    return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}
Run Code Online (Sandbox Code Playgroud)

现在我正在尝试编写我的应用程序来消耗交易并计算该人余额中的总金额。

( …

java lambda apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
1518
查看次数

如何扩展 Kafka Stream 应用程序

我对 kafka 文档中关于这个主题的措辞有点困惑,所以我想在这里问我是否正确地解释了这些内容?

因此,如果我正确理解这种扩展 Kafka Stream 应用程序的唯一方法是启动应用程序的新实例(或增加 application 中的流线程数量),这将确保 ConsumerGroup('application. id'),这样我就可以将流应用程序扩展到主题的分区数量(如果我的流拓扑连接到多个主题,实际上会发生什么,假设 TopicA 有 5 个分区,topicB 有 3 个分区,我加入了 TopicA 和 TopicB 的流,我猜在这种情况下我可以扩展到 3 个实例/线程)。

现在假设我有一个包含 5 个分区的 topicA,并且启动了应用程序的 3 个实例,如果我在拓扑中配置了 KTable,则每个 KTable 将包含来自特定分区的信息,并且我必须找出我的哪个实例(分区)上的元数据关键是,那么当我启动第四个实例时会发生什么,假设实例3上的KTable的键/值现在可以转到实例4上的KTable,不是吗?一方面问题是这样的重新平衡需要多长时间(我认为这取决于主题大小,所以假设需要 1 分钟,我正在查询 KTable 的应用程序在此操作期间会没有响应吗?)

附带问题是,此机制对于“streamBuilder.table(..)”和“streambuilder.groupByKey(..).reduce(..)”的工作原理是否完全相同?

最后一个问题,同样是一个具有 5 个分区的主题,但我没有启动 3 个应用程序实例,而是启动了一个具有 3 个流线程的实例 (num.stream.threads = 3),我会再次拥有 3 个 KTable 代表 5 个分区吗?如果我将线程大小从 3 更改为 4,其行为与增加实例数完全相同。

感谢您的回答..

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
2359
查看次数