我有一个celery worker,它使用的gevent池执行HTTP请求,并使用页面源添加另一个celery任务。
我正在使用Django,RabbitMQ作为代理,Redis作为celery结果后端,Celery 4.1.0。
任务已完成,ignore_result=True但我经常收到此错误ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter...>
我看到它与Redis连接有关。
我不知道如何解决这个问题。这或多或少是任务的逻辑。我还尝试在调用时使用信号量,process_task.apply_async但是没有用。
from gevent.lock import BoundedSemaphore
sem = BoundedSemaphore(1)
@app.task(ignore_result=True, queue='request_queue')
def request_task(url, *args, **kwargs):
# make the request
req = requests.get(url)
request = {
'status_code': req.status_code,
'content': req.text,
'headers': dict(req.headers),
'encoding': req.encoding
}
with sem:
process_task.apply_async(kwargs={'url': url, 'request': request})
print(f'Done - {url}')
Run Code Online (Sandbox Code Playgroud)
这是堆栈跟踪:
cancel_wait_ex: [Errno 9] File descriptor was closed in another greenlet
File …Run Code Online (Sandbox Code Playgroud) 我有一个 gRPC 服务器,可以无限期地传输数据。它生成一个子进程来检索一些硬件资源统计信息。
我想要的是取消来自客户端的 gRPC 调用,以便我可以终止生成的进程。另一种解决方案是在客户端断开连接时在服务器上进行检测,以便我可以终止该进程。
我在客户端和服务器上都使用 grpc-node 。
我似乎还不知道该怎么做......