气流意外通知SIGTERM子流程

jim*_*ssy 7 python airflow

我正在使用PythonOperator调用一个将数据工程过程并行化为Airflow任务的函数。只需通过用Airflow调用的可包装函数包装一个简单函数即可。

def wrapper(ds, **kwargs):
    process_data()
Run Code Online (Sandbox Code Playgroud)

process_data使用产生子流程的多处理模块实现并行化。当我从jupyter笔记本电脑单独运行process_data时,它运行到最后都没有问题。但是,当我使用Airflow运行它时,任务失败,并且该任务的日志显示如下内容。

[2019-01-22 17:16:46,966] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,969] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-129:

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - Traceback (most recent call last):

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING -   File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
    raise AirflowException("Task received SIGTERM signal")

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal

[2019-01-22 17:16:46,993] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,996] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-133:

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - Traceback (most recent call last):

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/queues.py", line 343, in get
    res = self._reader.recv_bytes()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/synchronize.py", line 99, in __exit__
    return self._semlock.__exit__(*args)

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
    raise AirflowException("Task received SIGTERM signal")

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal

[2019-01-22 17:16:47,086] {logging_mixin.py:95} INFO - file parsing and processing 256.07

[2019-01-22 17:17:12,938] {logging_mixin.py:95} INFO - combining and sorting 25.85
Run Code Online (Sandbox Code Playgroud)

我不太确定为什么该任务会收到SIGTERM。我的猜测是某个更高级别的进程正在将它们发送到子进程。我应该怎么做才能调试此问题?

刚刚注意到,在该任务的日志结尾处,它明确指出:

airflow.exceptions.AirflowException: Task received SIGTERM signal
[2019-01-22 12:31:39,196] {models.py:1764} INFO - Marking task as FAILED.
Run Code Online (Sandbox Code Playgroud)

小智 10

我按照这里的答案。想法是一样的:不让 Airflow 过早关闭线程:

export AIRFLOW__CORE__KILLED_TASK_CLEANUP_TIME=604800成功了。

  • 有人可以解释一下这个参数的用途吗?我不完全理解气流的评论```#当一个任务被强制终止时,这是在发送 SIGTERM 之后、在 SIGKILLED 之前它必须清理的时间(以秒为单位)```。为什么我们的任务被强行杀死了?此外,从默认的 60 秒增加到 604800 秒(7 天)似乎很戏剧性。这个改变有什么缺点吗?谢谢! (4认同)