Google PubSub python客户端返回StatusCode.UNAVAILABLE

adr*_*ino 6 python google-cloud-pubsub

我正在尝试建立一个长期运行的Pull订阅Google Cloud PubSub主题.我使用的代码非常类似于此处文档中给出的示例,即:

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)
Run Code Online (Sandbox Code Playgroud)

问题是我有时收到以下追溯:

Exception in thread Consumer helper: consume bidirectional stream:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume
    self._policy.on_exception(exc)
  File "/path/to/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception
    raise exception
  File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume
    for response in response_generator:
  File "/path/to/grpc/_channel.py", line 348, in __next__
    return self._next()
  File "/path/to/grpc/_channel.py", line 342, in _next
    raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>
Run Code Online (Sandbox Code Playgroud)

我看到这是在另一个问题中引用的,但在这里我要问如何在Python中正确处理它.我试图在异常中包装请求,但它似乎在后台运行,我无法在出现该错误时重试.

Kri*_*vis 5

对我有用的有点hacky方法是自定义policy_class.默认值具有on_exception忽略的功能DEADLINE_EXCEEDED.您可以创建一个继承默认值的类,也可以忽略它UNAVAILABLE.我看起来像这样:

from google.cloud import pubsub
from google.cloud.pubsub_v1.subscriber.policy import thread
import grpc

class AvailablePolicy(thread.Policy):
    def on_exception(self, exception):
        """The parent ignores DEADLINE_EXCEEDED. Let's also ignore UNAVAILABLE.

        I'm not sure what triggers that error, but if you ignore it, your
        subscriber seems to work just fine. It's probably an intermittent
        thing and it reconnects later if you just give it a chance.
        """
        # If this is UNAVAILABLE, then we want to retry.
        # That entails just returning None.
        unavailable = grpc.StatusCode.UNAVAILABLE
        if getattr(exception, 'code', lambda: None)() == unavailable:
            return
        # For anything else, fallback on super.
        super(AvailablePolicy, self).on_exception(exception)

subscriber = pubsub.SubscriberClient(policy_class=AvailablePolicy)
# Continue to set up as normal.
Run Code Online (Sandbox Code Playgroud)

它看起来很像原版 on_exception只是忽略了一个不同的错误.如果需要,可以在抛出异常时添加一些日志记录,并验证一切仍然有效.未来的消息仍将通过.