我在 Cloud Composer v1.16.16 上运行 Airflowv1.10.15。
我的 DAG 看起来像这样:
from datetime import datetime, timedelta
# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large
default_args = {
'owner': 'xxxx',
'depends_on_past': False,
'start_date': datetime(2021, 9, 14),
'email_on_failure': True,
'email': ['xxxx'],
'retries': 1,
'retry_delay': timedelta(minutes=2),
'catchup': False
}
# Define the DAG with parameters
dag = DAG(
dag_id='xxxx_v1',
schedule_interval='0 20 * * *',
default_args=default_args,
catchup=False,
max_active_runs=1,
concurrency=1
)
def wd_to_bq(key, val, **kwargs):
logger.info("workday …Run Code Online (Sandbox Code Playgroud) sigkill python-3.x google-cloud-platform airflow google-cloud-composer
Airflow 中的 Dataproc Spark 运算符如何返回值以及如何捕获该值。
我有一个下游作业来捕获此结果,并根据返回的值,我必须通过分支操作员触发另一个作业。
apache-spark google-cloud-dataproc airflow google-cloud-composer
我正在使用带有图像版本和Python 2.7的Google Cloud Composer(在Google云平台上管理Airflow)composer-0.5.3-airflow-1.9.0,我面临一个奇怪的问题:导入我的DAG后,它们无法从Web UI中点击(并且没有按钮) "Trigger DAG","Graph view",...),虽然在运行本地Airflow时都能正常工作.
即使在Composer上的Web服务器中不可用,我的DAG仍然存在.我可以使用CLI(list_dags)列出它们,描述它们(list_tasks)甚至触发它们(trigger_dag).
我用来重现问题的一个最小例子如下所示.使用钩子(此处GoogleCloudStorageHook)非常重要,因为使用钩子时会发生Composer上的错误.最初,我使用自定义挂钩(在自定义插件中),并且面临同样的问题.
这里,该示例基本上列出了GCS存储桶(my-bucket)中的所有条目,并为每个以条目开头的条目生成DAG my_dag.
import datetime
from airflow import DAG
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.operators.bash_operator import BashOperator
google_conn_id = 'google_cloud_default'
gcs_conn = GoogleCloudStorageHook(google_conn_id)
bucket = 'my-bucket'
prefix = 'my_dag'
entries = gcs_conn.list(bucket, prefix=prefix)
for entry in entries:
dag_id = str(entry)
dag = DAG(
dag_id=dag_id,
start_date=datetime.datetime.today(),
schedule_interval='0 0 1 * *'
) …Run Code Online (Sandbox Code Playgroud) python python-2.7 airflow airflow-scheduler google-cloud-composer
仅针对某些故障/异常重试 Airflow 运算符的最佳方法是什么?
例如,假设我有一个 Airflow 任务,该任务依赖于外部服务的可用性。如果该服务在任务执行期间变得不可用,我想稍后重试(最多重试 3 次)。对于其他失败我不想重试。
我当前的方法是通过解析来使用on_failure_callback和操作context["ti"].task.retries所需的异常context["exception"],但我认为这很混乱且难以理解。有更好的选择吗?
我正在使用Cloud Composer,我注意到它为我选择了Apache Airflow和Python(2.7.x)的版本.我想使用不同版本的Airflow和/或Python.我怎么能改变这个?
我的 DAG 看起来像这样
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'dataflow_default_options': {
'project': 'test',
'tempLocation': 'gs://test/dataflow/pipelines/temp/',
'stagingLocation': 'gs://test/dataflow/pipelines/staging/',
'autoscalingAlgorithm': 'BASIC',
'maxNumWorkers': '1',
'region': 'asia-east1'
}
}
dag = DAG(
dag_id='gcs_avro_to_bq_dag',
default_args=default_args,
description='ETL for loading data from GCS(present in the avro format) to BQ',
schedule_interval=None,
dagrun_timeout=datetime.timedelta(minutes=30))
task = DataFlowJavaOperator(
task_id='gcs_avro_to_bq_flow_job',
jar='gs://test/dataflow/pipelines/jobs/test-1.0-SNAPSHOT.jar',
poll_sleep=1,
options={
'input': '{{ ts }}',
},
dag=dag)
Run Code Online (Sandbox Code Playgroud)
我的 DAG 正在执行一个 jar 文件。jar 文件包含运行数据流作业的代码,该作业将数据从 GCS 写入 BQ。jar 本身成功执行。
当我尝试执行气流作业时,我看到以下错误
[2020-05-20 17:20:41,934] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,840] …Run Code Online (Sandbox Code Playgroud) directed-acyclic-graphs google-cloud-platform airflow google-cloud-composer
当我需要在本地重新启动网络服务器时,我会这样做:
ps -ef | grep airflow | awk '{print $2}' | xargs kill -9
airflow webserver -p 8080 -D
Run Code Online (Sandbox Code Playgroud)
如何在 Google Composer 上执行此操作?我在控制台中没有看到重启服务器的选项。
python google-compute-engine google-cloud-platform airflow google-cloud-composer
在我的本地机器上,我创建了一个 virtualenv 并安装了 Airflow。当 dag 或插件需要 python 库时,我将它安装到同一个 virtualenv 中。
如何跟踪哪些库属于 dag,哪些库用于气流本身?我最近删除了一个 dag 并想删除它正在使用的库。这非常耗时,我交叉手指我没有删除另一个 dag 正在使用的东西!
当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?
我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?
python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator
我想使用保存在airflow使用 KubernetesPodOperator.
在开发映像时,我使用环境变量将数据库连接信息传递给容器,但生产环境将数据库保存为连接挂钩。
提取数据库连接信息并将其传递给容器的最佳方法是什么?
env_vars = {'database_usr': 'xxx', 'database_pas': 'xxx'}
Run Code Online (Sandbox Code Playgroud)
KubernetesPodOperator(
dag=dag,
task_id="example-task",
name="example-task",
namespace="default",
image="eu.gcr.io/repo/image:tag",
image_pull_policy="Always",
arguments=["-v", "image-command", "image-arg"],
env_vars=env_vars,
)
Run Code Online (Sandbox Code Playgroud) kubernetes airflow google-cloud-composer kubernetes-operator