我对使用 apache 气流很陌生。我使用 pycharm 作为我的 IDE。我创建了一个项目(anaconda 环境),创建了一个包含 DAG 定义和 Bash 运算符的 python 脚本。当我打开我的气流网络服务器时,我的 DAGS 没有显示。仅显示默认示例 DAG。我的AIRFLOW_HOME变量包含~/airflow. 所以我在那里存储了我的 python 脚本,现在它显示了。
我如何在项目环境中使用它?
我是否在每个项目开始时更改环境变量?
有没有办法为每个项目添加特定的气流主目录?
我不想将我的 DAG 存储在默认的气流目录中,因为我想将它添加到我的 git 存储库中。请帮帮我。
我想将某些单词添加到 wordcloud 中使用的默认停用词列表中。当前代码:
all_text = " ".join(rev for rev in twitter_clean.text)
stop_words = ["https", "co", "RT"]
wordcloud = WordCloud(stopwords = stop_words, background_color="white").generate(all_text)
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()
Run Code Online (Sandbox Code Playgroud)
当我使用自定义 stop_words 变量时,诸如 "is"、"was" 和 "the" 之类的词都被解释并显示为高频词。但是,当我使用默认的停用词列表(没有停用词参数)时,还有许多其他词显示为非常频繁。如何将我的自定义 stop_words 变量以及默认停用词列表添加到我的 wordcloud?
我正在尝试使用气流按计划使用气流在 Athena 中运行查询。
我在下面包含的函数是使用PythonOperatorin运行的airflow。
from airflow.models import Variable
from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook
import datetime
def update_athena_partition(*args, **kwargs):
execution_date = datetime.datetime.strptime(kwargs['ds'], '%Y-%m-%d')
execution_month = execution_date.month
execution_year = execution_date.year
s3_prefix = Variable.get('bikeshare_s3_prefix')
bucket_name = Variable.get('bikeshare_bucket_name')
athena_table_name = Variable.get('bikeshare_athena_table')
result_configuration = {"OutputLocation": "s3://{}/".format(bucket_name)}
file_location = 's3://bucket_name/' + s3_prefix + f'year=2018/month=2/'
partition_update_query = """
ALTER TABLE {} add partition (year="{}", month='{}')
location "{}";
"""
athena_hook = AWSAthenaHook(aws_conn_id='aws_credentials')
athena_hook.run_query(partition_update_query.format(athena_table_name,
2018,
2,
file_location),
result_configuration=result_configuration,
query_context="athena_database_name")
Run Code Online (Sandbox Code Playgroud)
这是我的 DAG
etl_dag = …Run Code Online (Sandbox Code Playgroud) airflow ×2
python ×2
apache ×1
boto3 ×1
etl ×1
matplotlib ×1
stop-words ×1
virtualenv ×1
word-cloud ×1