我正在使用 Airflow 1.8.1,我想从 PostgreOperator 推送 sql 请求的结果。
这是我的任务:
check_task = PostgresOperator(
task_id='check_task',
postgres_conn_id='conx',
sql="check_task.sql",
xcom_push=True,
dag=dag)
def py_is_first_execution(**kwargs):
value = kwargs['ti'].xcom_pull(task_ids='check_task')
print 'count ----> ', value
if value == 0:
return 'next_task'
else:
return 'end-flow'
check_branch = BranchPythonOperator(
task_id='is-first-execution',
python_callable=py_is_first_execution,
provide_context=True,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
这是我的 sql 脚本:
select count(1) from table
Run Code Online (Sandbox Code Playgroud)
当我检查 xcom 值时,check_task它会检索none值。
我想使用执行日期作为我的sql文件的参数:
我试过了
dt = '{{ ds }}'
s3_to_redshift = PostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
params={'file': dt},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
但它不起作用.
我最近安装了apache airflow 1.8.1,我执行了以下命令:
airflow initdb
返回以下错误:
Traceback (most recent call last):
File "/usr/bin/airflow", line 18, in <module>
from airflow.bin.cli import CLIFactory
File "/usr/lib/python2.7/dist-packages/airflow/bin/cli.py", line 65, in <module>
auth=api.api_auth.client_auth)
AttributeError: 'module' object has no attribute 'client_auth'
Run Code Online (Sandbox Code Playgroud)
我尝试了几种解决方案,但它不起作用.
我有pyspark数据框,其中包含名为Filters的列:“ array>”
我想将数据帧保存在csv文件中,为此,我需要将数组转换为字符串类型。
我尝试将其强制转换为:DF.Filters.tostring()和DF.Filters.cast(StringType()),但两种解决方案均会在“过滤器”列中为每一行生成错误消息:
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@56234c19
代码如下
from pyspark.sql.types import StringType
DF.printSchema()
|-- ClientNum: string (nullable = true)
|-- Filters: array (nullable = true)
|-- element: struct (containsNull = true)
|-- Op: string (nullable = true)
|-- Type: string (nullable = true)
|-- Val: string (nullable = true)
DF_cast = DF.select ('ClientNum',DF.Filters.cast(StringType()))
DF_cast.printSchema()
|-- ClientNum: string (nullable = true)
|-- Filters: string (nullable = true)
DF_cast.show()
| ClientNum | Filters
| 32103 | org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@d9e517ce
| …Run Code Online (Sandbox Code Playgroud) 我想使用从任务传递的一个参数来执行一个函数。
这是我的带有状态参数的函数:
def sns_notify(state):
client = boto3.client('sns')
if state == "failed":
message = config.get('sns', 'message') + state
else:
message = config.get('sns', 'message') + state
response = client.publish(TargetArn=config.get('sns', 'target_arn'),
Message=message,
Subject=config.get('sns', 'subject'))
return response
Run Code Online (Sandbox Code Playgroud)
这是我的任务,状态为参数:
t1 = DummyOperator(task_id='Dummy-1', trigger_rule=TriggerRule.ALL_SUCCESS,
on_success_callback=sns_notify("ok"), dag=dag)
t2 = DummyOperator(task_id='Dummy-2', trigger_rule=TriggerRule.ONE_FAILED,
on_success_callback=sns_notify("failed"), dag=dag)
Run Code Online (Sandbox Code Playgroud)
当我运行 dag 时,该函数不会停止发送邮件(对于本例)
我使用sqlalchemy和psycopg2将python连接到redshift.
engine = create_engine('postgresql://user:password@hostname:port/database_name')
Run Code Online (Sandbox Code Playgroud)
我想避免使用我的密码连接到redshift并使用IAM Role.
python postgresql sqlalchemy amazon-web-services amazon-redshift
我有一个 pandas 数据框,如下所示:
User | Query| Filters
-----------------------------------------------------------------------------------------
1 | abc | [{u'Op': u'and', u'Type': u'date', u'Val': u'1992'},{u'Op': u'and', u'Type': u'sex', u'Val': u'F'}]
1 | efg | [{u'Op': u'and', u'Type': u'date', u'Val': u'2000'},{u'Op': u'and', u'Type': u'col', u'Val': u'Blue'}]
1 | fgs | [{u'Op': u'and', u'Type': u'date', u'Val': u'2001'},{u'Op': u'and', u'Type': u'col', u'Val': u'Red'}]
2 | hij | [{u'Op': u'and', u'Type': u'date', u'Val': u'2002'}]
2 | dcv | [{u'Op': u'and', u'Type': u'date', u'Val': u'2001'},{u'Op': u'and', u'Type': u'sex', u'Val': u'F'}]
2 | …Run Code Online (Sandbox Code Playgroud) 我试图找到一种方法,在此命令中使用变量将 -10 替换为 n_days var:
n_days= -10
date_prefix=$(date -d '-10 day' +%Y/%m/%d)
Run Code Online (Sandbox Code Playgroud)
我尝试了这种方法,但没有成功:
date_prefix=$(date -d '${n_days} day' +%Y/%m/%d)
Run Code Online (Sandbox Code Playgroud) python ×6
airflow ×4
python-2.7 ×2
scheduler ×2
apache-spark ×1
bash ×1
dataframe ×1
date ×1
json ×1
linux ×1
pandas ×1
postgresql ×1
pyspark ×1
shell ×1
sql ×1
sqlalchemy ×1
unix ×1