3 python-3.x python-asyncio nats.io
我正在尝试让 NATS 订阅者持续收听已发布的消息。
我的发布者是一个可以在浏览器中访问的 API 端点。我的订阅者是一个Python应用程序,它应该永远运行,监听已发布的消息。
我的问题是订户从不打印任何内容。如果我将 run_forever() 更改为loop.close(),它可以工作,但会立即关闭。我知道发布者正在工作,因为我可以看到 NATS 服务器的打印输出。
我正在 docker-compose 中运行所有内容。
我的订阅者:
import asyncio
from nats.aio.client import Client as NATS
async def run(loop):
await nc.connect("nats://nats:4222", loop=loop)
async def message_handler_A(msg):
print('fsfdsfdsfdsfdsf')
async def message_handler_B(msg):
print('fdsfdsfdsfdsf')
async def message_handler_C(msg):
print('sdfdsfdsf')
await nc.subscribe("message_handler_A", cb=message_handler_A)
await nc.subscribe("message_handler_B", cb=message_handler_B)
await nc.subscribe("message_handler_C", cb=message_handler_C)
print('receiving')
if __name__ == '__main__':
print("RUNNING")
nc = NATS()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run(loop))
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
我的出版商:
import connexion
import six
import json
import asyncio
from nats.aio.client import Client as NATS
from swagger_server import util
async def run(loop):
nc = NATS()
# [begin publish_json]
await nc.connect("nats://nats:4222", loop=loop)
for i in range(10):
await nc.publish("message_handler_B", b"")
await nc.publish("message_handler_C", b"")
await nc.publish("message_handler_A", b"")
def healthz_get(): # noqa: E501
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run(loop))
loop.close()
return 'Processing Request'
Run Code Online (Sandbox Code Playgroud)
我的码头工人组成:
version: '3'
services:
nats:
image: 'nats:0.8.0'
entrypoint: "/gnatsd -DV"
expose:
- "4222"
ports:
- "4222:4222"
hostname: nats-server
data_api:
restart: always
build: ..\data_api
image: data_api
container_name: data_api
ports:
- "5022:5022"
depends_on:
- "POCpostgres"
- "queue_app"
queue_app:
build: ..\queue_app
image: queue_app
container_name: queue_app
ports:
- "5023:5023"
Run Code Online (Sandbox Code Playgroud)
小智 6
答案是使用 Nats 流媒体服务:STAN:
订户:
import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN
async def run(loop):
nc = NATS()
sc = STAN()
# Start session with NATS Streaming cluster using
# the established NATS connection.
await nc.connect(io_loop=loop)
await sc.connect("test-cluster", "client-123", nats=nc)
# Example async subscriber
async def cb(msg):
print("Received a message (seq={}): {}".format(msg.seq, msg.data))
# Subscribe to get all messages from the beginning.
await sc.subscribe("greetings", start_at='first', cb=cb)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
发布者: import connexion import 6 from swagger_server import util
import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN
async def run(loop):
nc = NATS()
sc = STAN()
# First connect to NATS, then start session with NATS Streaming.
await nc.connect(io_loop=loop)
await sc.connect("test-cluster", "client-456", nats=nc)
await sc.publish("greetings", b'Hello World!')
await nc.flush(1)
print("sent")
await sc.close()
await nc.close()
def healthz_get(): # noqa: E501
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run(loop))
loop.close()
return 'Processing Request'
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2977 次 |
| 最近记录: |