Python Celery - 如何在其他任务中调用celery任务

Pyt*_*ast 6 python django celery djcelery

我在Django-Celery的任务中调用任务

这是我的任务.

@shared_task
def post_notification(data,url):
    url = "http://posttestserver.com/data/?dir=praful" # when in production, remove this line.
    headers = {'content-type': 'application/json'}
    requests.post(url, data=json.dumps(data), headers=headers)


@shared_task
def shipment_server(data,notification_type):
    notification_obj = Notification.objects.get(name = notification_type)
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)

    for server in server_list:
        task = post_notification.delay(data,server.server_id.url)
        print task.status # it prints 'Nonetype' has no attribute id
Run Code Online (Sandbox Code Playgroud)

如何在任务中调用任务? 我在某处可以看到它可以使用group,但我无法形成正确的语法.我该怎么做?

我试过这个

for server in server_list:
    task = group(post_notification.s(data, server.server_id.url))().get()
    print task.status
Run Code Online (Sandbox Code Playgroud)

发出警告说

TxIsolationWarning: Polling results w?                                                                        
ith transaction isolation level repeatable-read within the same transacti?                                                                        
on may give outdated results. Be sure to commit the transaction for each ?                                                                        
poll iteration.                                                          ?                                                                        
  'Polling results with transaction isolation level '
Run Code Online (Sandbox Code Playgroud)

不知道它是什么!

我该如何解决我的问题?

And*_*bin 7

这应该工作:

celery.current_app.send_task('mymodel.tasks.mytask', args=[arg1, arg2, arg3])
Run Code Online (Sandbox Code Playgroud)

  • 请注意,您不必使用 `send_task`,您可以毫无问题地使用任务中的 `task.delay`,您的问题是轮询返回的结果对象。 (2认同)

Sye*_*b M 5

你是对的,因为for循环中的每个任务都将被覆盖task变量。

你可以试试celery.group

from celery import group
Run Code Online (Sandbox Code Playgroud)

@shared_task
def shipment_server(data,notification_type):
    notification_obj = Notification.objects.get(name = notification_type)
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)


    tasks = [post_notification.s(data, server.server_id.url) for server in server_list]
    results = group(tasks)()
    print results.get() # results.status() what ever you want
Run Code Online (Sandbox Code Playgroud)