Wha*_*ame 5 python redis celery
所以基本上我有一个非常复杂的工作流程,看起来类似于:
>>> res = (add.si(2, 2) | add.s(4) | add.s(8))()
>>> res.get()
16
Run Code Online (Sandbox Code Playgroud)
之后,我走向结果链并收集所有个别结果对我来说相当微不足道:
>>> res.parent.get()
8
>>> res.parent.parent.get()
4
Run Code Online (Sandbox Code Playgroud)
我的问题是,如果我的第三个任务取决于知道第一个任务的结果怎么办,但在这个例子中只收到第二个任务的结果?
链也很长,结果也不小,所以只是通过输入就会不必要地污染结果存储.哪个是Redis,所以使用RabbitMQ,ZeroMQ时的限制......不适用.
小智 5
也许你的设置对于这个来说太复杂了,但我喜欢group结合一个noop任务来完成类似的事情。我这样做是因为我想突出显示在我的管道中仍然同步的区域(通常这样它们可以被删除)。
使用类似于您的示例的内容,我从一组如下所示的任务开始:
tasks.py:
from celery import Celery
app = Celery('tasks', backend="redis", broker='redis://localhost')
@app.task
def add(x, y):
return x + y
@app.task
def xsum(elements):
return sum(elements)
@app.task
def noop(ignored):
return ignored
Run Code Online (Sandbox Code Playgroud)
通过这些任务,我使用组创建了一个链来控制依赖于同步结果的结果:
In [1]: from tasks import add,xsum,noop
In [2]: from celery import group
# First I run the task which I need the value of later, then I send that result to a group where the first task does nothing and the other tasks are my pipeline.
In [3]: ~(add.si(2, 2) | group(noop.s(), add.s(4) | add.s(8)))
Out[3]: [4, 16]
# At this point I have a list where the first element is the result of my original task and the second element has the result of my workflow.
In [4]: ~(add.si(2, 2) | group(noop.s(), add.s(4) | add.s(8)) | xsum.s())
Out[4]: 20
# From here, things can go back to a normal chain
In [5]: ~(add.si(2, 2) | group(noop.s(), add.s(4) | add.s(8)) | xsum.s() | add.s(1) | add.s(1))
Out[5]: 22
Run Code Online (Sandbox Code Playgroud)
我希望这是有用的!
小智 3
我为每个链分配一个作业 ID,并通过将数据保存在数据库中来跟踪该作业。
启动队列
if __name__ == "__main__":
# Generate unique id for the job
job_id = uuid.uuid4().hex
# This is the root parent
parent_level = 1
# Pack the data. The last value is your value to add
parameters = job_id, parent_level, 2
# Build the chain. I added an clean task that removes the data
# created during the process (if you want it)
add_chain = add.s(parameters, 2) | add.s(4) | add.s(8)| clean.s()
add_chain.apply_async()
Run Code Online (Sandbox Code Playgroud)
现在任务
#Function for store the result. I used sqlalchemy (mysql) but you can
# change it for whatever you want (distributed file system for example)
@inject.params(entity_manager=EntityManager)
def save_result(job_id, level, result, entity_manager):
r = Result()
r.job_id = job_id
r.level = level
r.result = result
entity_manager.add(r)
entity_manager.commit()
#Restore a result from one parent
@inject.params(entity_manager=EntityManager)
def get_result(job_id, level, entity_manager):
result = entity_manager.query(Result).filter_by(job_id=job_id, level=level).one()
return result.result
#Clear the data or do something with the final result
@inject.params(entity_manager=EntityManager)
def clear(job_id, entity_manager):
entity_manager.query(Result).filter_by(job_id=job_id).delete()
@app.task()
def add(parameters, number):
# Extract data from parameters list
job_id, level, other_number = parameters
#Load result from your second parent (level - 2)
#For level 3 parent level - 3 and so on
#second_parent_result = get_result(job_id, level - 2)
# do your stuff, I guess you want to add numbers
result = number + other_number
save_result(job_id, level, result)
#Return the result of the sum or anything you want, but you have to send something because the "add" function expects 3 values
#Of course your should return the actual job and increment the parent level
return job_id, level + 1, result
@app.task()
def clean(parameters):
job_id, level, result = parameters
#Do something with final result or not
#Clear the data
clear(job_id)
Run Code Online (Sandbox Code Playgroud)
我使用entity_manager 来管理数据库操作。我的实体管理器使用 sql alchemy 和 mysql。我还使用表“结果”来存储部分结果。这部分应该根据您最好的存储系统进行更改(或者如果 mysql 适合您,则使用此部分)
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
import inject
class EntityManager():
session = None
@inject.params(config=Configuration)
def __init__(self, config):
conf = config['persistence']
uri = conf['driver'] + "://" + conf['username'] + ":@" + conf['host'] + "/" + conf['database']
engine = create_engine(uri, echo=conf['debug'])
Session = sessionmaker(bind=engine)
self.session = Session()
def query(self, entity_type):
return self.session.query(entity_type)
def add(self, entity):
return self.session.add(entity)
def flush(self):
return self.session.flush()
def commit(self):
return self.session.commit()
class Configuration:
def __init__(self, params):
f = open(os.environ.get('PYTHONPATH') + '/conf/config.yml')
self.configMap = yaml.safe_load(f)
f.close()
def __getitem__(self, key: str):
return self.configMap[key]
class Result(Base):
__tablename__ = 'result'
id = Column(Integer, primary_key=True)
job_id = Column(String(255))
level = Column(Integer)
result = Column(Integer)
def __repr__(self):
return "<Result (job='%s', level='%s', result='%s')>" % (self.job_id, str(self.level), str(self.result))
Run Code Online (Sandbox Code Playgroud)
我使用包注入来获取依赖注入器。注入包将重用该对象,因此您可以每次需要时注入对数据库的访问权限,而不必担心连接。
类配置是将数据库访问数据加载到配置文件中。您可以替换它并使用静态数据(硬编码的地图)进行测试。
将依赖注入更改为任何其他适合您的东西。这只是我的解决方案。我刚刚添加它是为了快速测试。
这里的关键是将部分结果保存在队列系统中的某个位置,并在任务中返回数据以访问这些结果(job_id 和父级)。您将发送这个额外的(但很小的)数据,它是指向真实数据(一些大数据)的地址(job_id +父级)。
这个解决方案是我在我的软件中使用的