谷歌云发布订阅数据丢失

xiu*_*shi 5 google-cloud-messaging google-cloud-platform google-cloud-pubsub

我遇到了 GCP pubsub 的问题,即在几秒钟内发布数千条消息时丢失了一小部分数据。

我正在message_id从 pubsub 和一个session_id唯一的发布端和接收端的每条消息进行记录,我看到的结果是接收端的某些消息具有相同session_id但不同的message_id. 此外,还丢失了一些消息。

例如,在一项测试中,我向 pubsub 发送了 5,000 条消息,并且恰好收到了 5,000 条消息,其中 8 条消息丢失。日志丢失消息如下所示:

MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API)

messageId FOUND: messageId:108562396466545

API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API)

Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request)
Run Code Online (Sandbox Code Playgroud)

重复项看起来像:

======= Duplicates FOUND on sessionId: 730=======

sessionId: 730, messageId:108562396466545

sessionId: 730, messageId:108561339282318

(both are logs from pull request)
Run Code Online (Sandbox Code Playgroud)

所有丢失的数据和重复项看起来像这样。

从上面的例子可以看出,有些消息已经取了message_id另一条消息的message_ids ,并且用两个不同的s发送了两次。

我想知道是否有人能帮我弄清楚发生了什么?提前致谢。

代码

我有一个向 pubsub 发送消息的 API,如下所示:

MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API)

messageId FOUND: messageId:108562396466545

API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API)

Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request)
Run Code Online (Sandbox Code Playgroud)

这是我用来从 pubsub 读取的代码:

从 google.cloud 导入 pubsub 导入重新导入 json

ps = pubsub.Client()
topic = ps.topic('test-xiu')
sub = topic.subscription('TEST-xiu')

max_messages = 1
stop = False

messages = []

class Message(object):
    """docstring for Message."""
    def __init__(self, sessionId, messageId):
        super(Message, self).__init__()
        self.seesionId = sessionId
        self.messageId = messageId


def pull_all():
    while stop == False:

        m = sub.pull(max_messages = max_messages, return_immediately = False)

        for data in m:
            ack_id = data[0]
            message = data[1]
            messageId = message.message_id
            data = message.data
            event = json.loads(data)
            sessionId = str(event["sessionId"])
            messages.append(Message(sessionId = sessionId, messageId = messageId))

            print '200 **** sessionId: ' + sessionId + ", messageId:" + messageId + " ******"

            sub.acknowledge(ack_ids = [ack_id])

pull_all()
Run Code Online (Sandbox Code Playgroud)

为了生成 session_id,从 API 发送请求和记录响应:

======= Duplicates FOUND on sessionId: 730=======

sessionId: 730, messageId:108562396466545

sessionId: 730, messageId:108561339282318

(both are logs from pull request)
Run Code Online (Sandbox Code Playgroud)

更新

我对 API 进行了更改,问题似乎消失了。我所做的更改不是pubsub.Client()对所有请求都使用一个,而是为每个传入的请求初始化一个客户端。新 API 如下所示:

from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json


app = Flask(__name__)
ps = pubsub.Client()

...

@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
    pubsub_topic = 'test_topic'
    data = request.data

    topic = ps.topic(pubsub_topic)

    event = json.loads(data)

    messageId = topic.publish(data)
    return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"
Run Code Online (Sandbox Code Playgroud)

xiu*_*shi 4

与 Google 的一些人交谈,这似乎是 Python 客户端的问题:

我们这边的共识是当前的python客户端存在线程安全问题。正如我们所说,客户端库几乎是从头开始重写的,所以我不想在当前版本中寻求任何修复。我们预计新版本将于六月底推出。

在 app.yaml 中使用 thread_safe: false 运行当前代码或更好,但只需在每次调用中实例化客户端应该是解决方法 - 您找到的解决方案。

详细解决方案请参阅问题中的更新