flink:丢失记录了吗?

eth*_*nny 1 java apache-kafka apache-flink

我的拓扑结构如下:( kafka(p:6)->reduce(p:6)->db writer(p:12)其中p:是并行性).

  • 我让它在单个节点"集群"上运行 taskmanager.numberOfTaskSlots: 30
  • 我知道我的kafka源正在产生~65M记录/分钟
  • kafka'读者'具有与kafka分区相同的并行度

当我观察这个工作(通过flink UI)约1分钟时,这些是我看到的值:

  • kafka - > reduce:~1.5M记录发送(关闭> 4x)
  • 减少(窗口聚合5秒) - > db写入~114K记录发送(关闭> 2x)1
  • db write - >收到的记录:~23K(关闭> 5x)2

(其他部分的发送/接收值之间存在较小的差异,但我可以将这些差异归因于测量误差)

问题:
1.其余记录在哪里?
2.运行时,此机器上的负载永远不会超过1.5.还有其他一些限制因素吗?
3.我误读了UI中的值吗?

Java 8
Flink 1.0(最新github)
机器:32核/ 96 Gb RAM

1这可以通过汇总过程来解释.
2此值与写入数据库的内容对齐.

Ste*_*wen 6

Flink不会丢失记录,它们只是在飞行中缓冲,或者它们在Kafka停留的时间更长.从数字来看,看起来你正在经历背压.

您可以看到"reducer"已经发出了许多"db writer"尚未收到的记录.在这种情况下,这些记录仍然在运营商之间的通信信道的缓冲区中.这些通道具有有限的缓冲量(取决于配置的缓冲区的数量,通常为几MB).对于小记录,他们可能会持有多个10k记录.

如果一个运营商中发送的记录数量明显落后于接收运营商收到的记录,则表明接收者(此处为"数据库编写者")无法跟上数据速率.也许是因为DB没有足够快地处理插入(太同步,太细粒度的提交?),"db writer"和DB之间的网络可能已经饱和.

在这种情况下,"db writer"将对减速器进行反压,最终也会对Kafka Source进行反压.

要尝试在没有数据库背压的情况下数据速率是多少,您可以尝试"db writer"只删除所有记录的实验.