Airflow 中的 Joblib 和其他并行任务

Mic*_*ael 8 python parallel-processing multiprocessing joblib airflow

我过去使用过 Joblib 和 Airflow 并且没有遇到过这个问题。我正在尝试通过 Airflow 运行一个使用 Joblib 运行并行计算的作业。当 Airflow 作业启动时,我看到以下警告

UserWarning: Loky-backed parallel loops cannot be called in multiprocessing, setting n_jobs=1
Run Code Online (Sandbox Code Playgroud)

将警告追溯到源头我看到 LokyBackend 类的 joblib 包中触发了以下函数(MultiprocessingBackend 类中也有类似的逻辑)

def effective_n_jobs(self, n_jobs):
    """Determine the number of jobs which are going to run in parallel"""
    if n_jobs == 0:
        raise ValueError('n_jobs == 0 in Parallel has no meaning')
    elif mp is None or n_jobs is None:
        # multiprocessing is not available or disabled, fallback
        # to sequential mode
        return 1
    elif mp.current_process().daemon:
        # Daemonic processes cannot have children
        if n_jobs != 1:
            warnings.warn(
                'Loky-backed parallel loops cannot be called in a'
                ' multiprocessing, setting n_jobs=1',
                stacklevel=3)
        return 1
Run Code Online (Sandbox Code Playgroud)

问题是我之前在 Joblib 和 Airflow 中运行过类似的函数并且没有触发这个条件设置为n_jobs等于 1。想知道这是否是某种类型的版本问题(使用 Airflow 2.X 和 Joblib 1.X)或者是否有 Airflow 中的设置可以解决此问题。我查看了旧版本的 Joblib,甚至降级到 Joblib 0.4.0 但这并没有解决任何问题。由于 API、数据库连接等方面的差异,我更不愿意降级 Airflow。


编辑:

这是我在 Airflow 中运行的代码:

def test_parallel():
    out=joblib.Parallel(n_jobs=-1, backend="loky")(joblib.delayed(lambda a: a+1)(i) for i in range(20))

with DAG("test", default_args=DEFAULT_ARGS, schedule_interval="0 8 * * *",) as test:
    run_test = PythonOperator(
        task_id="test",
        python_callable=test_parallel,
    )

    run_test
Run Code Online (Sandbox Code Playgroud)

以及气流日志中的输出:

[2021-07-27 10:41:29,890] {logging_mixin.py:104} WARNING - /data01/code/virtualenv/alpha/lib/python3.8/site-packages/joblib/parallel.py:733 UserWarning: Loky-backed parallel loops cannot be called in a multiprocessing, setting n_jobs=1
Run Code Online (Sandbox Code Playgroud)

我启动airflow schedulerairflow webserver通过supervisor. 但是,即使我从命令行启动了两个气流进程,问题仍然存在。然而,当我只是通过气流任务 API 运行任务时,它不会发生,例如airflow tasks test run_test

Ann*_*Zen 0

我注意到您没有调用run_test代码底部的函数。这可能是任何问题的原因吗?更正版本:

def test_parallel():
    out=joblib.Parallel(n_jobs=-1, backend="loky")(joblib.delayed(lambda a: a+1)(i) for i in range(20))

with DAG("test", default_args=DEFAULT_ARGS, schedule_interval="0 8 * * *",) as test:
    run_test = PythonOperator(
        task_id="test",
        python_callable=test_parallel,
    )

    run_test()
Run Code Online (Sandbox Code Playgroud)