Kafka Streams - 解释为什么KTable及其相关商店每30秒才更新一次

cod*_*ent 1 apache-kafka apache-kafka-streams

我有这个简单的KTable定义生成一个Store:

KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE);
table.print();
Run Code Online (Sandbox Code Playgroud)

我将消息发布到ORDERS_TOPIC,但商店直到每30秒才真正更新.这是有关提交消息的日志,因为已经过了30000ms时间:

2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher       : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null)
2017-07-25 23:53:15.567  INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-07-25 23:53:15.567  INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask   : task [0_0] Committing its state
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.i.ProcessorStateManager        : task [0_0] Flushing all stores registered in the state manager
f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec
{"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}
[KTABLE-SOURCE-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}<-null)
2017-07-25 23:53:15.569 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.state.internals.ThreadCache      : Thread order-service-streams-16941f70-87b3-45f4-88de-309e4fd22748-StreamThread-1 cache stats on flush: #puts=1, #gets=1, #evicts=0, #flushes=1
2017-07-25 23:53:15.576 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.internals.RecordCollectorImpl  : task [0_0] Flushing producer
Run Code Online (Sandbox Code Playgroud)

我发现控制它的属性是commit.interval.ms:

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
Run Code Online (Sandbox Code Playgroud)

为什么默认设置为30000ms(听起来很长时间),将其更改为10ms会有什么影响?

如果不使用KTable,我使用KStream ...

KStream<String, JsonNode> kStream = kStreamBuilder.stream(ORDERS_TOPIC);
kStream.print();
Run Code Online (Sandbox Code Playgroud)

...我可以立即看到消息,而不必等待那30000毫秒,为什么区别?

Mat*_*Sax 6

它与内存管理有关,特别是KTable缓存:http://docs.confluent.io/current/streams/developer-guide.html#memory-management

KTable实际上是一直更新的,如果使用"交互式查询"访问基础状态存储,则可以立即获取每个更新.但是,KTable缓存会缓冲更新以减少下游负载,并且每次触发提交时,都需要向下游刷新此缓存,以避免在发生故障时丢失数据.如果您的缓存大小很小,您可能还会看到下游记录,如果密钥被从缓存中逐出.

关于提交间隔:通常,提交间隔设置为一个相对较大的值,以减少代理的提交负载.