一次触发Firebase Cloud功能的最佳做法

Ant*_*nio 6 firebase google-cloud-functions

每次创建新的Firebase Auth用户时,我只需触发一次Firebase Cloud Function。我已经编写了一个可以正常工作的函数,该函数使用onCreate触发器为每个用户发送一封电子邮件。该函数发送欢迎电子邮件并跟踪一些分析数据,因此它不是幂等的。

这里的问题是Google多次调用该函数。这不是“错误”,而是预期的行为,开发人员必须对其进行处理。

我想知道将“至少一次”行为更改为“恰好一次”行为的最佳实践是什么。

现在会发生什么:

  1. 新用户“ A”注册。
  2. Google为用户A触发“ sendWelcomeEmail”。
  3. Google为用户A AGAIN触发“ sendWelcomeEmail” 。

一次运行该函数并中止/跳过同一用户的任何其他调用的最佳方法是什么?

dam*_*nix 10

我遇到了类似的问题,没有简单的解决方案。我发现对于任何使用外部系统的操作,都不可能使这样的函数具有幂等性。我正在使用 TypeScript 和 Firestore。

要解决此问题,您需要使用Firebase 事务。只有使用事务,您才能面对多次触发函数时发生的竞争条件,通常是同时触发。

我发现这个问题有两个级别:

  1. 您没有幂等功能,您只需要发送电子邮件即可成为幂等的。
  2. 您有一组幂等函数,需要执行一些需要与外部系统集成的操作。

这种整合的例子有:

  • 发送电子邮件
  • 连接到支付系统

1.对于非幂等函数(简单案例场景)

async function isFirstRun(user: UserRecord) {
  return await admin.firestore().runTransaction(async transaction => {
    const userReference = admin.firestore().collection('users').doc(user.uid);

    const userData = await transaction.get(userReference) as any
    const emailSent = userData && userData.emailSent
    if (!emailSent) {
      transaction.set(userReference, { emailSent: true }, { merge: true })
      return true;
    } else {
      return false;
    }
  })
}

export const onUserCreated = functions.auth.user().onCreate(async (user, context) => {
  const shouldSendEmail = await isFirstRun(user);
  if (shouldSendEmail) {
    await sendWelcomeEmail(user)
  }
})
Run Code Online (Sandbox Code Playgroud)

PS 您还可以使用内置eventId字段过滤掉重复的事件触发。请参阅https://cloud.google.com/blog/products/serverless/cloud-functions-pro-tips-building-idempotent-functions。所需的工作将是可比的——您仍然必须存储已处理的操作或事件。


2.对于一组已经幂等的函数(真实案例场景)

为了使用一组已经是幂等的函数来完成这项工作,我切换到了排队系统。我将操作推送到集合并利用 Firebase 事务将操作的执行“锁定”到一次仅一个函数。

我会尽量把最小的例子放在这里。

部署动作处理函数

export const onActionAdded = functions.firestore
  .document('actions/{actionId}')
  .onCreate(async (actionSnapshot) => {
    const actionItem: ActionQueueItem = tryPickingNewAction(actionSnapshot)

    if (actionItem) {
      if (actionItem.type === "SEND_EMAIL") {
        await handleSendEmail(actionItem)
        await actionSnapshot.ref.update({ status: ActionQueueItemStatus.Finished } as ActionQueueItemStatusUpdate)
      } else {
        await handleOtherAction(actionItem)
      }
    }
  });

/** Returns the action if no other Function already started processing it */
function tryPickingNewAction(actionSnapshot: DocumentSnapshot): Promise<ActionQueueItem> {
  return admin.firestore().runTransaction(async transaction => {
    const actionItemSnapshot = await transaction.get(actionSnapshot.ref);
    const freshActionItem = actionItemSnapshot.data() as ActionQueueItem;

    if (freshActionItem.status === ActionQueueItemStatus.Todo) {
      // Take this action
      transaction.update(actionSnapshot.ref, { status: ActionQueueItemStatus.Processing } as ActionQueueItemStatusUpdate)
      return freshActionItem;
    } else {
      console.warn("Trying to process an item that is already being processed by other thread.");
      return null;
    }
  })
}
Run Code Online (Sandbox Code Playgroud)

像这样将动作推送到集合

admin.firestore()
    .collection('actions')
    .add({
      created: new Date(),
      status: ActionQueueItemStatus.Todo,
      type: 'SEND_EMAIL',
      data: {...}
    })
Run Code Online (Sandbox Code Playgroud)

打字稿定义

export enum ActionQueueItemStatus {
  Todo = "NEW",
  Processing = "PROCESSING",
  Finished = "FINISHED"
}

export interface ActionQueueItem {
  created: Date
  status: ActionQueueItemStatus
  type: 'SEND_EMAIL' | 'OTHER_ACTION'
  data: EmailActionData
}

export interface EmailActionData {
  subject: string,
  content: string,
  userEmail: string,
  userDisplayName: string
}
Run Code Online (Sandbox Code Playgroud)

您可能需要使用更丰富的状态及其更改来调整它,但这种方法应该适用于任何情况,并且提供的代码应该是一个很好的起点。这也不包括重新运行失败操作的机制,但它们很容易找到。

如果你知道一个更简单的方法 - 请告诉我如何:)

祝你好运!

  • 您可以使用事件 ID 来标识对同一函数的多次调用。请参阅 https://cloud.google.com/blog/products/serverless/cloud-functions-pro-tips-building-idempot-functions (3认同)