芹菜任务下面这两项任务的区别

Gra*_*ntU 4 django django-celery

下面这两项任务有什么区别?

第一个给出错误,第二个运行得很好.两者都是相同的,它们接受额外的参数,并且它们都以相同的方式被调用.

ProcessRequests.delay(batch) **error object.__new__() takes no parameters**


SendMessage.delay(message.pk, self.pk) **works!!!!**   
Run Code Online (Sandbox Code Playgroud)

现在,我已经意识到错误意味着什么,但我的困惑是为什么一个工作而不是另一个工作.

任务...

1)

class ProcessRequests(Task):
    name = "Request to Process"
    max_retries = 1
    default_retry_delay = 3

    def run(self, batch):
       #do something
Run Code Online (Sandbox Code Playgroud)

2)

class SendMessage(Task):
    name = "Sending SMS"
    max_retries = 10
    default_retry_delay = 3

    def run(self, message_id, gateway_id=None, **kwargs):
        #do something
Run Code Online (Sandbox Code Playgroud)

完整任务代码....

from celery.task import Task
from celery.decorators import task

import logging

from sms.models import Message, Gateway, Batch
from contacts.models import Contact
from accounts.models import Transaction, Account


class SendMessage(Task):
    name = "Sending SMS"
    max_retries = 10
    default_retry_delay = 3

    def run(self, message_id, gateway_id=None, **kwargs):
        logging.debug("About to send a message.")

        # Because we don't always have control over transactions
        # in our calling code, we will retry up to 10 times, every 3
        # seconds, in order to try to allow for the commit to the database
        # to finish. That gives the server 30 seconds to write all of
        # the data to the database, and finish the view.
        try:
            message = Message.objects.get(pk=message_id)
        except Exception as exc:
            raise SendMessage.retry(exc=exc)


        if not gateway_id:
            if hasattr(message.billee, 'sms_gateway'):
                gateway = message.billee.sms_gateway
            else:
                gateway = Gateway.objects.all()[0]
        else:
            gateway = Gateway.objects.get(pk=gateway_id)

        # Check we have a credits to sent me message
        account = Account.objects.get(user=message.sender)
        # I'm getting the non-cathed version here, check performance!!!!!
        if account._balance() >= message.length:
            response = gateway._send(message)

            if response.status == 'Sent':
                # Take credit from users account.
                transaction = Transaction(
                    account=account,
                    amount=- message.charge,
                    description="Debit: SMS Sent",

                    )
                transaction.save()
                message.billed = True
                message.save()
        else:
            pass


        logging.debug("Done sending message.")


class ProcessRequests(Task):
    name = "Request to Process"
    max_retries = 1
    default_retry_delay = 3

    def run(self, message_batch):
        for e in Contact.objects.filter(contact_owner=message_batch.user, group=message_batch.group):
            msg = Message.objects.create(
                recipient_number=e.mobile,
                content=message_batch.content,
                sender=e.contact_owner,
                billee=message_batch.user,
                sender_name=message_batch.sender_name
            )
            gateway = Gateway.objects.get(pk=2)
            msg.send(gateway)
            #replace('[FIRSTNAME]', e.first_name)
Run Code Online (Sandbox Code Playgroud)

尝试:

 ProcessRequests.delay(batch) should work gives error error object.__new__() takes no parameters     
 ProcessRequests().delay(batch) also gives error error object.__new__() takes no parameters
Run Code Online (Sandbox Code Playgroud)

gat*_*tto 6

我能够重现你的问题:

import celery
from celery.task import Task

@celery.task
class Foo(celery.Task):
    name = "foo"
    def run(self, batch):
       print 'Foo'

class Bar(celery.Task):
    name = "bar"
    def run(self, batch):
       print 'Bar'

# subclass deprecated base Task class
class Bar2(Task):
    name = "bar2"
    def run(self, batch):
       print 'Bar2'

@celery.task(name='def-foo')
def foo(batch):
    print 'foo'
Run Code Online (Sandbox Code Playgroud)

输出:

In [2]: foo.delay('x')
[WARNING/PoolWorker-4] foo

In [3]: Foo().delay('x')
[WARNING/PoolWorker-2] Foo

In [4]: Bar().delay('x')
[WARNING/PoolWorker-3] Bar

In [5]: Foo.delay('x')
TypeError: object.__new__() takes no parameters

In [6]: Bar.delay('x')
TypeError: unbound method delay() must be called with Bar instance as first argument (got str instance instead)

In [7]: Bar2.delay('x')
[WARNING/PoolWorker-1] Bar2
Run Code Online (Sandbox Code Playgroud)

我看到你使用弃用的celery.task.Task基类,这就是你没有得到unbound method错误的原因:

Definition: Task(self, *args, **kwargs)
Docstring:
Deprecated Task base class.

Modern applications should use :class:`celery.Task` instead.
Run Code Online (Sandbox Code Playgroud)

我不知道为什么ProcessRequests不起作用.也许这是一些缓存问题,您可能已经尝试过将装饰器应用到您的类中并且它被缓存了,这正是您尝试将此装饰器应用于Task类时所获得的错误.

删除所有.pyc文件,重新启动芹菜工作者,然后重试.

不要直接使用类

  1. 任务每个(工作者)进程只实例化一次,因此每次创建任务类对象(在客户端)都没有意义,即Bar()错误.
  2. Foo.delay()或者Foo().delay()可能或可能不起作用,取决于装饰器name参数和类name属性的组合.

celery.registry.tasks字典中获取任务对象或仅使用@celery.task函数上的装饰器(foo在我的示例中).