Jak*_*ski 5 java scala apache-kafka apache-kafka-streams
我有一个包含 50 个分区的输入主题,我正在尝试计算使用 Kafka Streams 接收到的消息总量。考虑以下拓扑。
var inputStream = builder.stream("input-topic", Consumed.with(...));
inputStream
// Grouping by a constant key here for global aggregation.
.groupBy((k, v) -> 1L, Serialized.with(...))
.count()
.toStream()
.foreach((k, v) -> System.out.println("Count updated to: " + v));
...
// For simplicity, let's consider the cache size to be zero.
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
Run Code Online (Sandbox Code Playgroud)
当我开始调整线程数量时,奇怪的事情发生了。
在具有 1 个线程的最简单示例中,计数随着收到的消息数量而良好增长。
将线程数设置为例如 50,会发生一些事情:
有人能指出我正确的方向吗?
| 归档时间: |
|
| 查看次数: |
117 次 |
| 最近记录: |