我有这个查询:
SHOW PARTITIONS tablename;
Run Code Online (Sandbox Code Playgroud)
结果是:
dt=2018-01-12
dt=2018-01-20
dt=2018-05-21
dt=2018-04-07
dt=2018-01-03
Run Code Online (Sandbox Code Playgroud)
这给出了每个表的分区列表。该表的分区字段dt
是日期列。我想查看排序的分区。
该文档没有说明如何执行此操作:https : //docs.aws.amazon.com/athena/latest/ug/show-partitions.html
我尝试通过以下方式添加订单:
SHOW PARTITIONS tablename order by dt;
Run Code Online (Sandbox Code Playgroud)
但是它给出:
亚马逊雅典娜; 状态码:400;错误代码:InvalidRequestException;
第一个Dag 0 1 * * *
跑了没有任何问题.结束DAG 0 10 1 * *
没有运行.当我做:
import datetime
print datetime.datetime.now()
Run Code Online (Sandbox Code Playgroud)
我明白了:
2018-07-01 12:14:15.632812
Run Code Online (Sandbox Code Playgroud)
所以我不明白为什么没有安排这个DAG.我知道在10:00准确运行不是强制性的,但统计数据应该是Running
.
根据"最新运行"的第一个任务是2018-06-30 01:00
我怀疑我实际上并不了解Airflow时钟.从我的观点来看,最后一次运行是2018-07-01 01:00
因为它今天早上而不是昨天运行.
编辑: 我在documntation看到了这一段:
"请注意,如果您在一天的schedule_interval上运行DAG,则2016-01-01的运行标记将在2016-01-01T23:59之后很快触发.换句话说,作业实例将在其涵盖的时间段内启动结束了."
所以我想知道..我应该在我想要的实际日期前一天安排一切?所以,如果我真的想要运行一些东西,0 10 1 * *
我应该安排它 0 10 30 * *
?换句话说,如果我想在每个月的第一天10点运行一些东西,我应该把它安排到每个月的最后一天10点?
那逻辑在哪里?这很难理解和遵循.
它变得最糟糕,根据这个没有办法告诉调度程序这个输入.我是什么做的?!
我在没有FERNET_KEY的情况下启动了Airflow。意识到这一点后,我将执行以下操作:https : //airflow.apache.org/configuration.html#connections
pip install apache-airflow[crypto]
from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key)
Run Code Online (Sandbox Code Playgroud)
拿了钥匙并将其放入airflow.cfg
然后调用airflow initdb
,但是错误仍然出现。
我究竟做错了什么?
当我做:
airflow webserver -D
Run Code Online (Sandbox Code Playgroud)
我得到:
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 713, in extra_dejson
if self.extra:
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/attributes.py", line 293, in __get__
return self.descriptor.__get__(instance, owner)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 632, in get_extra
return fernet.decrypt(bytes(self._extra, 'utf-8')).decode()
File "/usr/lib/python2.7/dist-packages/cryptography/fernet.py", line 101, in decrypt
raise InvalidToken
Run Code Online (Sandbox Code Playgroud)
日志表明此代码存在问题:
def get_conn(conn_id, session=None):
conn = (session.query(Connection)
.filter(Connection.conn_id == conn_id)
.first())
return conn
def my_python_function():
conn = get_conn('s3connection')
key_id = …
Run Code Online (Sandbox Code Playgroud) 我在 Python 脚本中有以下代码:
from google.oauth2 import service_account
SCOPES = ['https://www.googleapis.com/auth/sqlservice.admin']
SERVICE_ACCOUNT_FILE = 'JSON_AUTH_FILE'
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE, scopes=SCOPES)
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = SERVICE_ACCOUNT_FILE
Run Code Online (Sandbox Code Playgroud)
这可以通过谷歌进行身份验证。
现在我想列出所有存储桶:
from google.cloud import storage
storage_client = storage.Client()
buckets = list(storage_client.list_buckets())
print(buckets)
Run Code Online (Sandbox Code Playgroud)
但这行不通。我得到:
google.api_core.exceptions.Forbidden: 403
xxx@yyy.iam.gserviceaccount.com does not have storage.buckets.list access to project
Run Code Online (Sandbox Code Playgroud)
当我点击它时,我看到它还有一个链接(这很奇怪,因为它说403,但这里显示401:
{
"error": {
"errors": [
{
"domain": "global",
"reason": "required",
"message": "Anonymous caller does not have storage.buckets.list access to project NUMBER.",
"locationType": "header",
"location": "Authorization"
}
],
"code": 401,
"message": "Anonymous caller …
Run Code Online (Sandbox Code Playgroud) 我有像Amazon S3连接设置等数据..这与我的DAG文件夹中的许多.py文件相关.
我知道default_args
在特定的.py文件中与所有DAGS共享我的问题是我如何定义可以访问DAG文件夹中所有 .py文件的数据?
想象一下,我需要更改我的S3连接细节...我不想翻阅所有文件并逐个更改.我想在一个地方改变它.
Airflow是否支持此功能?
我正在这里查看示例代码
有两个“操作”功能:
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
Run Code Online (Sandbox Code Playgroud)
和:
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
Run Code Online (Sandbox Code Playgroud)
对于my_sleeping_function
我们运行的每次运行print_context
?
我不明白的是顺序。这是图形和树..执行顺序不一样:
首先会发生什么?之后会发生什么?为什么?
我假设根据这个:
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag)
task.set_upstream(run_this)
Run Code Online (Sandbox Code Playgroud)
run_this 执行然后任务执行但循环让我感到困惑。
在Admin->Connection
我设置Conn Type S3
.
基本上我在My Python脚本中有这个代码:
if __name__ == '__main__':
AWS_ACCESS_KEY_ID = "..."
AWS_SECRET_ACCESS_KEY = "..."
AWS_DEFAULT_REGION = "..."
Start_Work
Run Code Online (Sandbox Code Playgroud)
我想要做的是从Airflow调用我的脚本并将连接的参数传递给它(而不是在脚本中硬编码).
我怎么做?
如何访问每个提交的数据?
我有以下DAG:
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2018, 07, 19, 11,0,0),
'email': ['me@me.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=2),
'catchup' : False,
'depends_on_past' : False,
}
with DAG('some_dag', schedule_interval=timedelta(minutes=30), max_active_runs=1, default_args=default_args) as dag:
Run Code Online (Sandbox Code Playgroud)
该dag每30分钟运行一次。它重写表中的数据(全部删除并写入)。因此,如果气流中断了2天,那么在此期间运行所有丢失的dag运行就没有意义了。
但是,以上定义不起作用。气流停止两天后,它仍然尝试运行所有丢失的任务。
我该如何解决?
我有这个 Python 文件:
class Get:
def __init__(self, i):
self.i = get_date(i)
self.df = self.get_file()
def get_file(self):
try:
...
return df
except Exception as e:
return ...
def get_date(self,i):
dt = datetime.now() - timedelta(days=i)
return dt.strftime("%Y-%m-%d")
def put(self,df):
....
class Fix:
def __init__(self,df):
....
if __name__ == '__main__':
for i in range(4, 0, -1):
get = Get(i)
fix = Fix(get.df)
get.put(fix.df)
Run Code Online (Sandbox Code Playgroud)
基本上这段代码生成 4 个最后的日期并在这些日期上运行函数(更新统计信息等...)
起初我想将每个函数转换为 PythonOperator 然后安排它,但我认为这行不通。我不知道如何转换 Classes 和它们之间传输的参数。
如果我在 2018 年 6 月 12 日及以下运行它,代码会执行以下操作:
是否有我可以使用的模板或任何建议?
假设我想编写一个DAG来显示Redshift特定模式中的所有表.该SQL
查询Show Tables;
如何为它创建DAG?我认为这应该是这样的:
dag = airflow.DAG(
'process_dimensions',
schedule_interval="@daily",
dagrun_timeout=timedelta(minutes=60),
default_args=args,
max_active_runs=1)
process_product_dim = SQLOperator(
task_id='process_product_dim',
conn_id='??????',
sql='Show Tables',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
有谁知道如何正确写它?