标签: google-cloud-composer

当 PythonOperator 出现错误“Negsignal.SIGKILL”时,Airflow DAG 失败

我在 Cloud Composer v1.16.16 上运行 Airflowv1.10.15。

我的 DAG 看起来像这样:

from datetime import datetime, timedelta

# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large

default_args = {
    'owner': 'xxxx',
    'depends_on_past': False,
    'start_date': datetime(2021, 9, 14),
    'email_on_failure': True,
    'email': ['xxxx'],
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
    'catchup': False
}


# Define the DAG with parameters
dag = DAG(
    dag_id='xxxx_v1',
    schedule_interval='0 20 * * *',
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    concurrency=1
)

def wd_to_bq(key, val, **kwargs):
    logger.info("workday …
Run Code Online (Sandbox Code Playgroud)

sigkill python-3.x google-cloud-platform airflow google-cloud-composer

13
推荐指数
1
解决办法
4万
查看次数

Dataproc Spark 运算符如何返回值以及如何捕获和返回该值

Airflow 中的 Dataproc Spark 运算符如何返回值以及如何捕获该值。

我有一个下游作业来捕获此结果,并根据返回的值,我必须通过分支操作员触发另一个作业。

apache-spark google-cloud-dataproc airflow google-cloud-composer

10
推荐指数
1
解决办法
575
查看次数

DAG无法在Google Cloud Composer网络服务器上点击,但在本地Airflow上正常运行

我正在使用带有图像版本和Python 2.7的Google Cloud Composer(在Google云平台上管理Airflow)composer-0.5.3-airflow-1.9.0,我面临一个奇怪的问题:导入我的DAG后,它们无法从Web UI中点击(并且没有按钮) "Trigger DAG","Graph view",...),虽然在运行本地Airflow时都能正常工作.

即使在Composer上的Web服务器中不可用,我的DAG仍然存在.我可以使用CLI(list_dags)列出它们,描述它们(list_tasks)甚至触发它们(trigger_dag).

再现问题的最小例子

我用来重现问题的一个最小例子如下所示.使用钩子(此处GoogleCloudStorageHook)非常重要,因为使用钩子时会发生Composer上的错误.最初,我使用自定义挂钩(在自定义插件中),并且面临同样的问题.

这里,该示例基本上列出了GCS存储桶(my-bucket)中的所有条目,并为每个以条目开头的条目生成DAG my_dag.

import datetime

from airflow import DAG
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.operators.bash_operator import BashOperator

google_conn_id = 'google_cloud_default'

gcs_conn = GoogleCloudStorageHook(google_conn_id)

bucket = 'my-bucket'
prefix = 'my_dag'

entries = gcs_conn.list(bucket, prefix=prefix)

for entry in entries:
    dag_id = str(entry)

    dag = DAG(
        dag_id=dag_id,
        start_date=datetime.datetime.today(),
        schedule_interval='0 0 1 * *'
    ) …
Run Code Online (Sandbox Code Playgroud)

python python-2.7 airflow airflow-scheduler google-cloud-composer

9
推荐指数
1
解决办法
1266
查看次数

仅在某些异常时重试 Airflow 任务实例

仅针对某些故障/异常重试 Airflow 运算符的最佳方法是什么?

例如,假设我有一个 Airflow 任务,该任务依赖于外部服务的可用性。如果该服务在任务执行期间变得不可用,我想稍后重试(最多重试 3 次)。对于其他失败我不想重试。

我当前的方法是通过解析来使用on_failure_callback和操作context["ti"].task.retries所需的异常context["exception"],但我认为这很混乱且难以理解。有更好的选择吗?

python airflow google-cloud-composer

9
推荐指数
1
解决办法
3398
查看次数

如何使用Cloud Composer选择Airflow或Python版本?

我正在使用Cloud Composer,我注意到它为我选择了Apache Airflow和Python(2.7.x)的版本.我想使用不同版本的Airflow和/或Python.我怎么能改变这个?

google-cloud-platform airflow google-cloud-composer

8
推荐指数
1
解决办法
1568
查看次数

Google Cloud Composer(Airflow) - DAG 内的数据流作业成功执行,但 DAG 失败

我的 DAG 看起来像这样

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'dataflow_default_options': {
        'project': 'test',
        'tempLocation': 'gs://test/dataflow/pipelines/temp/',
        'stagingLocation': 'gs://test/dataflow/pipelines/staging/',
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '1',
        'region': 'asia-east1'
    }
}

dag = DAG(
    dag_id='gcs_avro_to_bq_dag',
    default_args=default_args,
    description='ETL for loading data from GCS(present in the avro format) to BQ',
    schedule_interval=None,
    dagrun_timeout=datetime.timedelta(minutes=30))

task = DataFlowJavaOperator(
    task_id='gcs_avro_to_bq_flow_job',
    jar='gs://test/dataflow/pipelines/jobs/test-1.0-SNAPSHOT.jar',
    poll_sleep=1,
    options={
        'input': '{{ ts }}',
    },
    dag=dag)

Run Code Online (Sandbox Code Playgroud)

我的 DAG 正在执行一个 jar 文件。jar 文件包含运行数据流作业的代码,该作业将数据从 GCS 写入 BQ。jar 本身成功执行。

当我尝试执行气流作业时,我看到以下错误

[2020-05-20 17:20:41,934] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,840] …
Run Code Online (Sandbox Code Playgroud)

directed-acyclic-graphs google-cloud-platform airflow google-cloud-composer

8
推荐指数
1
解决办法
1336
查看次数

如何在 Google Composer 上重新启动气流服务器?

当我需要在本地重新启动网络服务器时,我会这样做:

ps -ef | grep airflow | awk '{print $2}' | xargs kill -9
airflow webserver -p 8080 -D
Run Code Online (Sandbox Code Playgroud)

如何在 Google Composer 上执行此操作?我在控制台中没有看到重启服务器的选项。

python google-compute-engine google-cloud-platform airflow google-cloud-composer

7
推荐指数
3
解决办法
6486
查看次数

如何在 Airflow 中管理 Python 依赖项?

在我的本地机器上,我创建了一个 virtualenv 并安装了 Airflow。当 dag 或插件需要 python 库时,我将它安装到同一个 virtualenv 中。

如何跟踪哪些库属于 dag,哪些库用于气流本身?我最近删除了一个 dag 并想删除它正在使用的库。这非常耗时,我交叉手指我没有删除另一个 dag 正在使用的东西!

airflow google-cloud-composer

7
推荐指数
2
解决办法
6277
查看次数

没有名为airfow.gcp的模块-如何运行使用python3 / beam 2.15的数据流作业?

当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?

我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?

python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator

7
推荐指数
1
解决办法
159
查看次数

如何在 GCP Cloud Composer 上的 Apache Airflow 上使用带有“KubernetesPodOperator”的连接钩子作为环境变量

我想使用保存在airflow使用 KubernetesPodOperator.

在开发映像时,我使用环境变量将数据库连接信息传递给容器,但生产环境将数据库保存为连接挂钩。

提取数据库连接信息并将其传递给容器的最佳方法是什么?

env_vars = {'database_usr': 'xxx', 'database_pas': 'xxx'}
Run Code Online (Sandbox Code Playgroud)
KubernetesPodOperator(
        dag=dag,
        task_id="example-task",
        name="example-task",
        namespace="default",
        image="eu.gcr.io/repo/image:tag",
        image_pull_policy="Always",
        arguments=["-v", "image-command", "image-arg"],
        env_vars=env_vars,
    )
Run Code Online (Sandbox Code Playgroud)

kubernetes airflow google-cloud-composer kubernetes-operator

7
推荐指数
1
解决办法
1113
查看次数