Google Cloud PubSub:不发送/接收来自Cloud Functions的所有消息

Sul*_*lla 1 python google-cloud-pubsub google-cloud-functions

摘要:我的客户代码通过将消息发布到发布/订阅主题来触发861后台Google Cloud Function。每个Cloud Function都执行一个任务,将结果上传到Google Storage,并将消息发布到客户端代码正在监听的另一个Pub / Sub主题。尽管执行了所有Cloud Functions(通过Google Storage中的结果数验证),客户端代码不会接收所有消息。

服务器端:我有一个后台Google Cloud Function,每次将消息发布到TRIGGER Pub / Sub主题时都会触发。消息数据的自定义属性根据功能执行特定任务的方式充当功能参数。然后将结果上传到Google存储空间中的存储桶,并将一条消息(带有taskID和执行时间详细信息)发布到RESULTS Pub / Sub主题(与用于触发此功能的消息不同)。

客户端:我需要执行861个不同的任务,这需要使用861个稍微不同的输入来调用Cloud Function。这些任务是相似的,云功能执行它们需要20秒钟至2分钟(中位数约为1分钟)。我为此创建了一个python脚本,该脚本是从Google Cloud Shell(或本地计算机Shell)运行的。客户端python脚本向TRIGGER Pub / Sub主题发布861条消息,该消息同时触发了多个Cloud Function,每个函数都在范围[0,860]中传递了唯一的taskID。然后,客户端python脚本以“同步提取”方式轮询RESULTS Pub / Sub主题,以查找任何消息。执行任务后,Cloud Function将使用唯一的taskID和计时详细信息将消息发布到RESULTS Pub / Sub主题。客户端使用此唯一的taskID来标识消息来自哪个任务。它还有助于识别被丢弃的重复消息。

基本步骤

  1. 客户端python脚本将861条消息(每条消息具有唯一的taskID)发布到TRIGGER Pub / Sub主题,并等待来自Cloud Function的结果消息。
  2. 调用了861个不同的Cloud Functions,每个函数执行一个任务,将结果上传到Google Storage,并将消息(带有taskID和执行时间详细信息)发布到RESULTS Pub / Sub主题。
  3. 客户端同步获取所有消息,并将任务标记为完成。

问题:当客户端从RESULTS Pub / Sub主题轮询消息时,我没有收到所有taskID的消息。我确定Cloud Function已被调用并正确执行(我在Google存储桶中有861个结果)。我重复了几次,每次都发生。奇怪的是,每次运行时丢失的taskID的数量都会更改,并且不同的taskID也会丢失。我还跟踪收到的重复taskID的数量。表中给出了5次独立运行所接收,丢失和重复的唯一taskID的数量。

SN   # of Tasks  Received  Missing  Repeated
1     861          860      1        25
2     861          840      21       3
3     861          851      10       1
4     861          837      24       3
5     861          856      5        1
Run Code Online (Sandbox Code Playgroud)

我不确定这个问题可能来自哪里。考虑到数字的随机性以及缺少的taskID,我怀疑Pub / Sub至少一次传递逻辑中存在一些错误。如果在Cloud Function中,我睡了几秒钟而不是执行任务(例如使用time.sleep(5)),则一切正常(我在客户端收到了所有861 taskID)。

重现此问题的代码。

在下面,main.pyrequirements.txt一起作为client.py客户端代码部署为Google Cloud Function 。用100个并发任务运行客户端,python client.py 100该任务重复5次。每次都会丢失不同数量的taskID。

requirements.txt

google-cloud-pubsub
Run Code Online (Sandbox Code Playgroud)

main.py

SN   # of Tasks  Received  Missing  Repeated
1     861          860      1        25
2     861          840      21       3
3     861          851      10       1
4     861          837      24       3
5     861          856      5        1
Run Code Online (Sandbox Code Playgroud)

client.py

google-cloud-pubsub
Run Code Online (Sandbox Code Playgroud)

Dan*_*ins 7

在您的云功能中,此行:

Publisher.publish(topic_path,data = data,taskID = taskID)

您不是在等待Publisher.publish返回的未来。这意味着您无法保证当您离开该gcf_run函数的结尾时,实际上已经在该主题上进行了发布,但是无论如何,TRIGGER主题云函数订阅上的消息还是会被确认。

相反,要等到发布发生云功能终止时,这应该是:

publisher.publish(topic_path, data=data, taskID=taskID).result()
Run Code Online (Sandbox Code Playgroud)

您还应该避免在每次函数调用时启动和关闭发布者客户端,而应将客户端作为全局变量。