小编Oma*_*r14的帖子

气流:如何从 PostgreOperator 推送 xcom 值?

我正在使用 Airflow 1.8.1,我想从 PostgreOperator 推送 sql 请求的结果。

这是我的任务:

check_task = PostgresOperator(
    task_id='check_task',
    postgres_conn_id='conx',
    sql="check_task.sql",
    xcom_push=True,
    dag=dag)

def py_is_first_execution(**kwargs):
    value = kwargs['ti'].xcom_pull(task_ids='check_task')
    print 'count ----> ', value
    if value == 0:
       return 'next_task'
    else:
       return 'end-flow'

check_branch = BranchPythonOperator(
    task_id='is-first-execution',
    python_callable=py_is_first_execution,
    provide_context=True,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

这是我的 sql 脚本:

select count(1) from table
Run Code Online (Sandbox Code Playgroud)

当我检查 xcom 值时,check_task它会检索none值。

python airflow apache-airflow airflow-scheduler

15
推荐指数
2
解决办法
7560
查看次数

Airflow:将{{ds}}作为参数传递给PostgresOperator

我想使用执行日期作为我的sql文件的参数:

我试过了

dt = '{{ ds }}'

s3_to_redshift = PostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    params={'file': dt},
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

但它不起作用.

python scheduler airflow apache-airflow

12
推荐指数
1
解决办法
6533
查看次数

虽然气流initdb,但AttributeError:module'对象没有属性'client_auth'

我最近安装了apache airflow 1.8.1,我执行了以下命令:

airflow initdb

返回以下错误:

Traceback (most recent call last):
  File "/usr/bin/airflow", line 18, in <module>
    from airflow.bin.cli import CLIFactory
  File "/usr/lib/python2.7/dist-packages/airflow/bin/cli.py", line 65, in <module>
    auth=api.api_auth.client_auth)
AttributeError: 'module' object has no attribute 'client_auth'
Run Code Online (Sandbox Code Playgroud)

我尝试了几种解决方案,但它不起作用.

python-2.7 airflow apache-airflow

6
推荐指数
1
解决办法
3145
查看次数

Pyspark:将具有嵌套结构的数组转换为字符串

我有pyspark数据框,其中包含名为Filters的列:“ array>”

我想将数据帧保存在csv文件中,为此,我需要将数组转换为字符串类型。

我尝试将其强制转换为:DF.Filters.tostring()DF.Filters.cast(StringType()),但两种解决方案均会在“过滤器”列中为每一行生成错误消息:

org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@56234c19

代码如下

from pyspark.sql.types import StringType

DF.printSchema()

|-- ClientNum: string (nullable = true)
|-- Filters: array (nullable = true)
    |-- element: struct (containsNull = true)
          |-- Op: string (nullable = true)
          |-- Type: string (nullable = true)
          |-- Val: string (nullable = true)

DF_cast = DF.select ('ClientNum',DF.Filters.cast(StringType())) 

DF_cast.printSchema()

|-- ClientNum: string (nullable = true)
|-- Filters: string (nullable = true)

DF_cast.show()

| ClientNum | Filters 
|  32103    | org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@d9e517ce
| …
Run Code Online (Sandbox Code Playgroud)

python sql apache-spark pyspark spark-dataframe

4
推荐指数
2
解决办法
1万
查看次数

Airflow:当on_success_callback执行带参数的函数时

我想使用从任务传递的一个参数来执行一个函数。

这是我的带有状态参数的函数:

def sns_notify(state):
    client = boto3.client('sns')
    if state == "failed":
        message = config.get('sns', 'message') + state
    else:
        message = config.get('sns', 'message') + state
    response = client.publish(TargetArn=config.get('sns', 'target_arn'),
                              Message=message,
                              Subject=config.get('sns', 'subject'))
    return response
Run Code Online (Sandbox Code Playgroud)

这是我的任务,状态为参数:

t1 = DummyOperator(task_id='Dummy-1', trigger_rule=TriggerRule.ALL_SUCCESS,
                   on_success_callback=sns_notify("ok"), dag=dag)

t2 = DummyOperator(task_id='Dummy-2', trigger_rule=TriggerRule.ONE_FAILED,
                   on_success_callback=sns_notify("failed"), dag=dag)
Run Code Online (Sandbox Code Playgroud)

当我运行 dag 时,该函数不会停止发送邮件(对于本例)

python scheduler scheduled-tasks airflow

4
推荐指数
1
解决办法
9159
查看次数

使用IAM Role使用Python连接到Redshift

我使用sqlalchemy和psycopg2将python连接到redshift.

engine = create_engine('postgresql://user:password@hostname:port/database_name')
Run Code Online (Sandbox Code Playgroud)

我想避免使用我的密码连接到redshift并使用IAM Role.

python postgresql sqlalchemy amazon-web-services amazon-redshift

3
推荐指数
1
解决办法
3088
查看次数

如何将 Pandas DataFrame 中的字典列表展平为几列?

我有一个 pandas 数据框,如下所示:

User | Query|                                 Filters                 
----------------------------------------------------------------------------------------- 
1    |  abc | [{u'Op': u'and', u'Type': u'date', u'Val': u'1992'},{u'Op': u'and', u'Type': u'sex', u'Val': u'F'}]
1    |  efg | [{u'Op': u'and', u'Type': u'date', u'Val': u'2000'},{u'Op': u'and', u'Type': u'col', u'Val': u'Blue'}] 
1    |  fgs | [{u'Op': u'and', u'Type': u'date', u'Val': u'2001'},{u'Op': u'and', u'Type': u'col', u'Val': u'Red'}]        
2    |  hij | [{u'Op': u'and', u'Type': u'date', u'Val': u'2002'}]  
2    |  dcv | [{u'Op': u'and', u'Type': u'date', u'Val': u'2001'},{u'Op': u'and', u'Type': u'sex', u'Val': u'F'}]     
2    | …
Run Code Online (Sandbox Code Playgroud)

python json dataframe python-2.7 pandas

0
推荐指数
1
解决办法
3150
查看次数

Shell:将变量插入命令中

我试图找到一种方法,在此命令中使用变量将 -10 替换为 n_days var:

   n_days= -10
   date_prefix=$(date -d '-10 day' +%Y/%m/%d)
Run Code Online (Sandbox Code Playgroud)

我尝试了这种方法,但没有成功:

   date_prefix=$(date -d '${n_days} day' +%Y/%m/%d)
Run Code Online (Sandbox Code Playgroud)

unix linux bash shell date

-1
推荐指数
1
解决办法
1万
查看次数