Apache Airflow与Google云pubsub的集成

Rav*_*avi 6 google-cloud-pubsub airflow apache-airflow

我对气流很新,并尝试使用apache airflow与google pubsub的集成,我想这是在"Airflow-300"JIRA下添加的.如果我在这里读错了,请纠正我.

另外,您能否告知它是否已经发布或什么时候发布?我们正在考虑在Google云端存储上添加通知,在任何文件事件中,我们希望触发Airflow中的某些工作流程.

我似乎无法找到有关如何使用它的任何文档.

任何建议都将受到高度赞赏.

mik*_*laj 5

已经引入了在 Airflow 中的集成。

发布消息

from base64 import b64encode as b64e

m1 = {'data': b64e('Hello, World!'),
       'attributes': {'type': 'greeting'}
      }
m2 = {'data': b64e('Knock, knock')}
m3 = {'attributes': {'foo': ''}}

t1 = PubSubPublishOperator(
    topic='my_topic',
    messages=[m1, m2, m3],
    create_topic=True,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

接收消息

PubSubPullSensor(
    task_id='pub_sub_wait', 
    project='my_project',
    subscription='my-subscription',
    ack_messages=True)
Run Code Online (Sandbox Code Playgroud)

参考:

https://github.com/apache/incubator-airflow/commit/d231dce37d753ed196a26d9b244ddf376385de38 https://github.com/apache/incubator-airflow/commit/6645218092096e4b10fcbac7326a62e