你如何处理亚马逊Kinesis记录重复?

Ant*_*nio 13 amazon-web-services amazon-elasticache amazon-dynamodb amazon-kinesis amazon-elasticsearch

根据Amazon Kinesis Streams 文档,可以多次传送记录.

确保仅处理每个记录的唯一方法是将它们临时存储在支持完整性检查的数据库中(例如DynamoDB,Elasticache或MySQL/PostgreSQL),或者只检查每个Kinesis分片的RecordId.

您是否知道更好/更有效的处理重复方法?

Dmi*_*bin 15

在为移动应用构建遥测系统时,我们遇到了这个问题.在我们的例子中,我们也不确定生产者只发送一次消息,因此对于每个接收到的记录,我们在运行中计算了MD5并检查它是否以某种形式的持久存储器呈现,但实际上使用的是什么存储器.最棘手的一点.

首先,我们尝试了琐碎的关系数据库,但它很快成为整个系统的一个主要瓶颈,因为这不仅仅是读取繁重而且写得很重的情况,因为通过Kinesis的数据量非常大.

我们最终得到了一个DynamoDB表,用于存储每个唯一消息的MD5.我们遇到的问题是删除消息并不容易 - 即使我们的表包含分区和排序键,DynamoDB也不允许删除具有给定分区键的所有记录,我们必须查询所有要获取的消息排序键值(浪费时间和容量).不幸的是,我们不得不偶尔放下整个桌子.另一种不太理想的解决方案是定期旋转存储消息标识符的DynamoDB表.

然而,最近DynamoDB引入了一个非常方便的功能 - Time To Live,这意味着现在我们可以通过在每个记录的基础上启用自动到期来控制表的大小.在这种意义上,DynamoDB似乎与ElastiCache非常相似,但是ElastiCache(至少是Memcached集群)的耐用性要低得多 - 那里没有冗余,并且在运行或失败的情况下,所有驻留在终止节点上的数据都会丢失.

  • 嗨@Antonio.在我们的例子中,生产者可能会多次发布相同的消息.如果是这种情况,那么Kinesis无论如何都会将它们视为不同的消息(仅仅因为生产者有2个或更多的帖子).由于我们知道每条消息都必须是唯一的,我们只是忽略了已经看过md5的消息.此外,md5由生产者计算,为消费者节省了一些计算时间(假设通过Kinesis的数据量相对较大). (3认同)
  • 嗨德米特里。我正在使用类似于此处解释的 JustGiving 基础设施的东西运行多个基准:https://aws.amazon.com/blogs/compute/serverless-cross-account-stream-replication-using-aws-lambda-amazon-dynamodb-and -amazon-kinesis-firehose/ . 为什么为 DDB 表计算 MD5 校验和而不是使用 Shardid + SequenceNumber? (2认同)

az3*_*az3 12

你提到的事情是所有队列系统都有"至少一次"方法的一般问题.此外,不仅是队列系统,生产者和消费者都可以多次处理相同的消息(由于ReadTimeout错误等).Kinesis和Kafka都使用这种范式.不幸的是,没有一个简单的答案.

您也可以尝试使用"一次性"消息队列,使用更严格的事务处理方法.例如AWS SQS执行此操作:https://aws.amazon.com/about-aws/whats-new/2016/11/amazon-sqs-introduces-fifo-queues-with-exactly-once-processing-and-lower -prices-for-standard-queues /.请注意,SQS吞吐量远小于Kinesis.

要解决您的问题,您应该了解您的应用程序域并尝试在内部解决它(如数据库检查).特别是当您与外部服务(例如电子邮件服务器)通信时,您应该能够恢复操作状态以防止双重处理(因为在电子邮件服务器示例中双重发送,可能会导致多个副本收件人邮箱中的相同帖子).

另见以下概念;

  1. 至少一次交付:http://www.cloudcomputingpatterns.org/at_least_once_delivery/
  2. 完全一次交付:http://www.cloudcomputingpatterns.org/exactly_once_delivery/
  3. 幂等处理器:http://www.cloudcomputingpatterns.org/idempotent_processor/