Airflow 任务中不允许使用 multiprocessing.Pool 吗?- AssertionError:守护进程不允许有子进程

Can*_*ice 9 python multiprocessing airflow

我们的气流项目有一个从 BigQuery 查询并用于Pool并行转储到本地 JSON 文件的任务:

def dump_in_parallel(table_name):
    base_query = f"select * from models.{table_name}"
    all_conf_ids = range(1,10)
    n_jobs = 4

    with Pool(n_jobs) as p:
        p.map(partial(dump_conf_id, base_query = base_query), all_conf_ids)

    with open("/tmp/final_output.json", "wb") as f:
        filenames = [f'/tmp/output_file_{i}.json' for i in all_conf_ids]
Run Code Online (Sandbox Code Playgroud)

该任务在 Airflow v1.10中运行良好,但在v2.1+中不再运行。此处的第 2.1 节 - https://blog.mbedded.ninja/programming/languages/python/python-multiprocessing/ - 提到“如果您尝试从已使用池创建的子进程中创建池,您将运行进入错误:守护进程不允许有子进程”

这是完整的气流错误:

[2021-08-22 02:11:53,064] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 150, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 161, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/plugins/tasks/bigquery.py", line 249, in dump_in_parallel
    with Pool(n_jobs) as p:
  File "/usr/local/lib/python3.7/multiprocessing/context.py", line 119, in Pool
    context=self.get_context())
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
    self._repopulate_pool()
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
    w.start()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 110, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)

如果重要的话,我们使用LocalExecutor运行气流。知道为什么这个使用Pool 的任务可以在 Airflow v1.10 中运行,但不再在 Airflow 2.1 中运行吗?

Jar*_*iuk 8

Airflow 2 在底层使用不同的处理模型来加速处理,同时在运行任务之间保持基于进程的隔离。

这就是为什么它forking在钩子下使用 and multiprocessing 来运行任务,但这也意味着如果您使用多处理,您将遇到 Python 多处理的限制,不允许链接多处理。

我不能 100% 确定它是否有效,但您可以尝试将execute_tasks_new_python_interpreter配置设置为 True。https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#execute-tasks-new-python-interpreter。此设置将导致气流在运行任务时启动新的 Python 解释器,而不是分叉/使用多重处理(尽管我不能 100% 确定后者)。尽管运行任务时它的运行速度会慢一些(最多几秒钟的开销),因为新的 Python 解释器必须在运行任务之前重新初始化并导入所有气流代码。

如果这不起作用,那么您可以使用 PythonVirtualenvOperator 来午餐您的多处理作业 - 该操作将启动一个新的 Python 解释器来运行您的 python 代码,并且您应该能够使用多处理。


Can*_*ice 7

根据https://github.com/celery/celery/issues/4525multiprocessing用库替换库。我们不知道为什么将一个库替换为另一个库可以解决这个问题......billiard