我有所有运行的BigQuery连接器,但是我希望在Cloud Composer而不是App Engine Flexible上计划在Docker容器中有一些现有脚本。
我有以下脚本似乎遵循了我可以找到的示例:
import datetime
from airflow import DAG
from airflow import models
from airflow.operators.docker_operator import DockerOperator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_args = {
# Setting start date as yesterday starts the DAG immediately
'start_date': yesterday,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
}
schedule_interval = '45 09 * * *'
dag = DAG('xxx-merge', default_args=default_args, schedule_interval=schedule_interval)
hfan = DockerOperator(
task_id = 'hfan',
image = …Run Code Online (Sandbox Code Playgroud) directed-acyclic-graphs docker airflow google-cloud-composer
鉴于GCP composer与GKE / GCE一起运行,是否可以自动缩放?现在,集群中有3个节点可以支持100个DAG。
后来,如果我有大约300个DAG,它会自我扩展(带有芹菜工人)吗?
我BigQueryOperator在 Google Cloud Composer 上的 Airflow DAG 中广泛使用。
对于较长的查询,最好将每个查询放在自己的.sql文件中,而不是用它来弄乱 DAG。Airflow 似乎支持所有 SQL 查询运算符,包括 BigQueryOperator,如您在文档中所见。
我的问题:在.sql模板文件中编写了我的 sql 语句后,如何将其添加到 Google Cloud Composer 并在 DAG 中引用它?
我使用BashOperator实现了一些任务。使用“ gsutil rm”和“ gsutil cp”的用户可以正常工作。但是使用“ gcloud alpha firestore export”的人会产生此错误:
{bash_operator.py:101} INFO - ERROR: (gcloud.alpha.firestore.export) PERMISSION_DENIED: The caller does not have permission
Run Code Online (Sandbox Code Playgroud)
该命令本身在gcloud shell中可以正常工作。我尝试为Composer使用的服务帐户提供一些与Firestore相关的权限,但是它仍然不起作用。任何的想法
在使用 DataFlowPythonOperator 之前,我使用了气流的 BashOperator。它工作正常。我的光束管道需要一个特定的参数,这是我在 BashOperator 中使用的命令。
仅供参考 - 此光束管道用于将 CSV 文件转换为镶木地板。
python /home/airflow/gcs/pyFile.py --runner DataflowRunner --project my-project --jobname my-job--num-workers 3 --temp_location gs://path/Temp/ --staging_location gs://path/Staging/ --input gs://path/*.txt --odir gs://path/output --ofile current
Run Code Online (Sandbox Code Playgroud)
这些是我必须传递的必需参数,以使我的光束管道正常工作。
现在如何在DataFlowPythonOperator 中传递这些参数?
我试过了,但我不知道我应该在哪里提到所有参数。我试过这样的事情:
task1 = DataFlowPythonOperator(
task_id = 'my_task',
py_file = '/home/airflow/gcs/pyfile.py',
gcp_conn_id='google_cloud_default',
options={
"num-workers" : 3,
"input" : 'gs://path/*.txt',
"odir" : 'gs://path/',
"ofile" : 'current',
"jobname" : 'my-job'
},
dataflow_default_options={
"project": 'my-project',
"staging_location": 'gs://path/Staging/',
"temp_location": 'gs://path/Temp/',
},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
使用当前的脚本(虽然我不确定它的格式是否正确),这是我在日志中得到的:
[2020-03-06 05:08:48,070] {base_task_runner.py:115} INFO - Job …Run Code Online (Sandbox Code Playgroud) 对于一个要求,我想从云作曲家管道内部调用/调用云函数,但我找不到太多相关信息,我尝试使用 SimpleHTTP 气流运算符,但收到此错误:
[2021-09-10 10:35:46,649] {taskinstance.py:1503} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1158, in
_run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1333, in
_prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1363, in
_execute_task
result = task_copy.execute(context=context)
File "/home/airflow/gcs/dags/to_gcf.py", line 51, in execute
if not self.response_check(response):
File "/home/airflow/gcs/dags/to_gcf.py", line 83, in <lambda>
response_check=lambda response: False if len(response.json()) == 0 else True,
File "/opt/python3.8/lib/python3.8/site-packages/requests/models.py", line 900, in json
return complexjson.loads(self.text, **kwargs)
File "/opt/python3.8/lib/python3.8/json/__init__.py", line …Run Code Online (Sandbox Code Playgroud) python google-cloud-platform airflow google-cloud-functions google-cloud-composer
我有这样的目录结构:
\nairflow_dags\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dags\n\xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 hk \n\xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 hk_dag.py \n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 plugins\n\xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 cse \n\xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 operators.py \n\xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 cse_to_bq.py \n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 test\n \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 dags \n \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 dag_test.py \nRun Code Online (Sandbox Code Playgroud)\n在 Cloud Composer 创建的 GCS 存储桶中,有一个插件文件夹,我将cse文件夹上传到其中。
现在在我的hk_dag.py文件中,如果我像这样导入插件:
from plugins.cse.operators.cse_to_bq import CSEToBQOperator\nRun Code Online (Sandbox Code Playgroud)\n并运行我的单元测试,它通过了,但在云编辑器中我收到一条ModuleNotFoundError: No module named 'plugins'错误消息。
如果我在我的中导入这样的插件hk_dag.py:
from cse.operators.cse_to_bq import CSEToBQOperator\nRun Code Online (Sandbox Code Playgroud)\n我的单元测试失败了,ModuleNotFoundError: No module named 'cse'但在 Cloud Composer 中运行良好。
我该如何解决?
\n我正在尝试在 Composer 2 环境中运行 GKEStartPodOperator/KubernetesPodOperator 任务,该环境在自动驾驶模式下使用 GKE 集群。我们现有的 Composer 1 环境中的 GKE 集群不处于自动驾驶模式。我们使用 Google Cloud Platform 服务(BigQuery、GCS 等)进行身份验证的任务在 Composer 2 环境中失败并出现 401 未经授权,但在 Composer 1 环境中成功。
在日志文件中,我可以看出两个环境中的任务都是通过向元数据服务器发出请求来获取凭据的。主要区别是 Composer 1 中的任务请求分配给任务运行所在节点的服务帐户,但 Composer 2 中的任务请求似乎是工作负载身份池,例如[project-name].svc.id.goog.
Composer 1 的日志是:
[2021-10-22 12:38:01,349] {pod_launcher.py:148} INFO - DEBUG:google.auth._default:Checking None for explicit credentials as part of auth process...
[2021-10-22 12:38:01,351] {pod_launcher.py:148} INFO - DEBUG:google.auth._default:Checking Cloud SDK credentials as part of auth process...
[2021-10-22 12:38:01,352] {pod_launcher.py:148} INFO - DEBUG:google.auth._default:Cloud SDK credentials not found on disk; not …Run Code Online (Sandbox Code Playgroud) service-accounts google-kubernetes-engine airflow google-cloud-composer workload-identity
我正在使用 GCP Composer 运行算法,在流结束时我想运行一个任务,该任务将执行多项操作,将文件和文件夹从卷复制和删除到存储桶我正在尝试执行这些复制和删除通过 a 进行操作kubernetespodoperator。我很难找到使用“cmds”运行多个命令的正确方法,我也尝试使用“cmds”和“arguments”。这是我KubernetesPodOperator尝试过的命令和参数组合:
post_algo_run = kubernetes_pod_operator.KubernetesPodOperator(
task_id="multi-coher-post-operations",
name="multi-coher-post-operations",
namespace="default",
image="google/cloud-sdk:alpine",
### doesn't work ###
cmds=["gsutil", "cp", "/data/splitter-output\*.csv", "gs://my_bucket/data" , "&" , "gsutil", "rm", "-r", "/input"],
#Error:
#[2022-01-27 09:31:38,407] {pod_manager.py:197} INFO - CommandException: Destination URL must name a directory, bucket, or bucket
#[2022-01-27 09:31:38,408] {pod_manager.py:197} INFO - subdirectory for the multiple source form of the cp command.
####################
### doesn't work ###
# cmds=["gsutil", "cp", "/data/splitter-output\*.csv", "gs://my_bucket/data ;","gsutil", "rm", "-r", "/input"],
# [2022-01-27 09:34:06,865] …Run Code Online (Sandbox Code Playgroud) I\xe2\x80\x99m 尝试执行下面的 dag。\n创建 dataproc 集群的操作员似乎无法启用可选组件来启用 jupyter 笔记本和 anaconda。\n我在这里找到了此代码:Component Gateway with DataprocOperator on Airflow尝试解决它,但对我来说它没有解决它,因为我认为这里的作曲家(气流)版本是不同的。我的版本是composer - 2.0.0-preview.5,airflow-2.1.4。
\n该操作员在创建集群时工作得很好,但它没有使用可选组件来创建以启用 Jupyter Notebook。\n有人有任何想法可以帮助我吗?
\nfrom airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor\nfrom airflow import DAG\nfrom datetime import datetime, timedelta\nfrom airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator,DataprocClusterDeleteOperator, DataProcSparkOperator\nfrom airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator\n\nyesterday = datetime.combine(datetime.today() - timedelta(1),\n datetime.min.time())\n\n\ndefault_args = {\n \'owner\': \'teste3\',\n \'depends_on_past\': False,\n \'start_date\' :yesterday,\n \'email\': [\'airflow@example.com\'],\n \'email_on_failure\': False,\n \'email_on_retry\': False,\n \'retries\': 0,\n \'retry_delay\': timedelta(minutes=5),\n\n}\n\ndag = DAG(\n \'teste-dag-3\',catchup=False, default_args=default_args, schedule_interval=None)\n\n\n# configura os componentes\nclass CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):\n\n def __init__(self, *args, **kwargs):\n super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)\n\n def _build_cluster_data(self):\n …Run Code Online (Sandbox Code Playgroud) google-cloud-platform google-cloud-dataproc airflow google-cloud-composer