Ric*_*ane 3 python-2.7 google-cloud-pubsub
全部,
我正在尝试学习如何使用 GCP PubSub,并且可以通过 CLI 命令对其进行测试(创建主题、订阅、发布到主题、从订阅中拉取等),但是当我跳转到 python 时(v 2.7,当前公司标准)我正在努力以同步方式拉取消息。
我已经查看了这个网址,它告诉你去睡觉和“While True”,但我无法想象有人在现实世界中这样做,对吗? https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-subscribe-python
这个网址告诉您可以使用 future.result(),我尝试过,但它不会像您想象的那样阻塞会话/线程: http: //google-cloud-python.readthedocs.io/en/latest/ pubsub/订阅者/index.html
有人还有其他想法吗?这是我的函数,它几乎直接来自示例之一:
def sample_receive_messages(subscription_name="my-sub", project=None):
"""Receives messages from a pull subscription."""
if not project:
credentials, project = google.auth.default()
subscriber = psub.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
# print('Received message: {}'.format(message))
message.ack()
print('<message>' + str(message.data) + '</message>')
subscription = subscriber.subscribe(subscription_path)
future = subscription.open(callback)
myResult = future.result()
subscription.close()
print("done")
Run Code Online (Sandbox Code Playgroud)
我最后的目标是有一个进程经常醒来,抓取消息并确认它们,将消息写入文件,然后结束。
到目前为止,该进程读取消息并将其打印出来(很棒),但它然后坐啊坐啊坐,最后错误地出现了一些乱码:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pubSubTools.py", line 50, in sample_receive_messages
myResult = future.result()
File "/usr/local/lib/python2.7/site-packages/google/cloud/pubsub_v1/futures.py", line 98, in result
raise err
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>
Run Code Online (Sandbox Code Playgroud)
小智 5
库文档中对此进行了描述:
from google.cloud import SubscriberClient
pubsub_client = SubscriberClient()
subscription_path = 'projects/{project}/subscriptions/{subscription}'.format(project=project_name, subscription=subscription_name)
pull_response= pubsub_client.pull(subscription=subscription_path, max_messages=10)
for msg in pull_response.received_messages:
message = msg.message.data.decode('utf-8')
# do your thing
pubsub_client.acknowledge(subscription_path, [msg.ack_id])
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5077 次 |
| 最近记录: |