我有一个很大的 csv 文件,我将其拆分为一个包含 100000 行的块列表,将每个块传递给一个函数来进行复杂的计算,并将结果附加到 global_list 中。当最后一个块完成时,我使用 global_list 并对其进行一些统计。我怎样才能让 celery 并行处理所有块,但要等到最后一个任务/最后一个块完成后再在 global_list 上执行函数 complex_calc?
感谢您的帮助
for chunk in global_chunk_list:
def func_calc.delay(chunk) #<<<<< use celery tasks
complex_calc(global_list) #<<<<< should only start when processing last chunk is finished
@celery.task(name='func_calc')
def func_calc(chunk):
...
#save chunk in a global list
global_list.append(result)
def complex_calc(global_list):
...
Run Code Online (Sandbox Code Playgroud) 我是 python 和 SQLAlchemy 的新手。我在python 3
我创建了一个类testtbl 来操作表。一个功能是update 我通过的地方:
myfilter = {'origin_source_id':'MX01’}.
在mysql表中找到这一行origin_source_id = ‘MX01'
updatevalue = {'origin_source_id':'CAL01’}
将找到的 origin_source_id 替换为 'CAL01;
电话
testtbl.update_row(myfilter,updatevalue)
Run Code Online (Sandbox Code Playgroud)
功能
def update_row(self,locaterowfilter,updatevalue):
self.Session.query( self.TableClass ).filter_by( **locaterowfilter ).update( updatevalue )
Run Code Online (Sandbox Code Playgroud)
我收到以下错误
target_cls = query._mapper_zero().class_
AttributeError: 'NoneType' object has no attribute ‘class_’
Run Code Online (Sandbox Code Playgroud)
我不知道如何处理它。
有没有更好的方法来做到这一点?