Google Cloud BigTable:更新列值

Dar*_*hta 0 bigtable google-cloud-dataflow google-cloud-bigtable apache-beam

我有以下 BigTable 结构作为示例:

Table1 : column_family_1 : column_1 : value
Run Code Online (Sandbox Code Playgroud)

value是一个数字。这是由数据流管理的,我想每次都更新该值。

该值可能是一个金额,我想在每次用户购买时更新它(以维持迄今为止的总支出),因此我在购买事件侦听器数据流中执行以下操作(每当遇到购买事件时):

  • 发出 BigTable 请求,通过 id 获取值
  • 将新购买的金额添加到 BigTable 搜索响应中显示的金额中
  • 发出Put更新值的请求

尽管这种方法有一些网络延迟,但它似乎有效。失败的情况是,当数据流有多个工作人员时,用户进行多次购买,并且事件会发送给多个工作人员,例如:

  • Worker 1 获取事件 1,获取金额并将花费的金额添加到其中
  • Worker 2 获取事件 2,获取金额并将花费的金额添加到其中
  • 两个工人都提出Put请求,但他们都被覆盖了

为了防止这种情况,我试图提出一个仅以纯文本形式表示的请求,add 10 to the spent amount value. 这是我们可以在数据流中做的事情吗?

Sol*_*kis 5

Bigtable 有能力Increment评估。您可以在protobuf 文档中查看更多详细信息

幂等性对于理解 Bigtable 中的计数器起着重要作用。在 Bigtable 中,Puts 通常是幂等的,这意味着您可以多次运行它们并始终得到相同的结果(a=2无论运行多少次都会产生相同的结果)。Increments 不是幂等的,因为多次运行它们会产生不同的结果(a++,与, ,a++具有不同的结果)。a++a++a++

瞬时故障可能会也可能不会执行Increment。客户端永远不清楚Increment在这些暂时性错误期间是否成功。

Increment由于这种幂等性,在 Dataflow 中构建此功能很复杂。数据流有一个“捆绑”的概念,它是充当工作单元的一组操作。这些捆绑包会因暂时性故障而重试(您可以在此处阅读有关数据流暂时性故障重试的更多信息)。Dataflow 将该“捆绑包”视为一个单元,但由于 Cloud Bigtable 不支持多行事务,因此 Cloud Bigtable 必须将“捆绑包”中的每个单独项目视为不同的事务。

鉴于“捆绑包”的预期行为不匹配,Cloud Bigtable 将不允许您Increment通过 Dataflow 运行。

您拥有的选项值得比我在这里提供的更多文档,但我可以提供一些高级选项:

  1. 始终用于Put您发现的任何新事件,并总结读取值。您还可以编写另一个作业,通过创建删除所有当前值的“事务”来定期清理行,并写入一个包含总和的新单元格

  2. 使用Cloud Functions侦听 Pub/Sub 事件并执行Increment操作。以下是使用 Cloud Functions 的 Cloud Bigtable 示例。您还可以执行 a Get、执行加法并使用CheckAndMutate您在帖子中描述的算法执行 a (CheckAndMutate如果我选择此选项,我个人会选择一致性)。

  3. 用于AbstractCloudBigtableTableDoFn编写您自己的DoFn执行Increments 或 的函数CheckAndMutate,但要理解这可能会导致数据完整性问题。

如果系统足够大,选项#1 是最强大的选项,但代价是系统复杂性。如果您不想要这种复杂性,那么选项 #2 是您的下一个最佳选择(尽管我会选择CheckAndMutate)。如果您不关心数据完整性并且需要高吞吐量(例如“页数”或其他遥测数据,在一小部分时间内出错是可以的),那么选项#3将是您最好的选择。