小编jac*_*ack的帖子

在Amazon Athena中按顺序显示分区

我有这个查询:

SHOW PARTITIONS tablename;
Run Code Online (Sandbox Code Playgroud)

结果是:

dt=2018-01-12
dt=2018-01-20
dt=2018-05-21
dt=2018-04-07
dt=2018-01-03
Run Code Online (Sandbox Code Playgroud)

这给出了每个表的分区列表。该表的分区字段dt是日期列。我想查看排序的分区。

该文档没有说明如何执行此操作:https : //docs.aws.amazon.com/athena/latest/ug/show-partitions.html

我尝试通过以下方式添加订单:

SHOW PARTITIONS tablename order by dt;
Run Code Online (Sandbox Code Playgroud)

但是它给出:

亚马逊雅典娜; 状态码:400;错误代码:InvalidRequestException;

amazon-athena

6
推荐指数
3
解决办法
3093
查看次数

气流为什么调度程序无法启动我的DAG?

我有以下Dag: 在此输入图像描述

第一个Dag 0 1 * * *跑了没有任何问题.结束DAG 0 10 1 * * 没有运行.当我做:

import datetime
print datetime.datetime.now()
Run Code Online (Sandbox Code Playgroud)

我明白了:

2018-07-01 12:14:15.632812
Run Code Online (Sandbox Code Playgroud)

所以我不明白为什么没有安排这个DAG.我知道在10:00准确运行不是强制性的,但统计数据应该是Running.

根据"最新运行"的第一个任务是2018-06-30 01:00我怀疑我实际上并不了解Airflow时钟.从我的观点来看,最后一次运行是2018-07-01 01:00因为它今天早上而不是昨天运行.

编辑: 我在documntation看到了这一段:

"请注意,如果您在一天的schedule_interval上运行DAG,则2016-01-01的运行标记将在2016-01-01T23:59之后很快触发.换句话说,作业实例将在其涵盖的时间段内启动结束了."

所以我想知道..我应该在我想要的实际日期前一天安排一切?所以,如果我真的想要运行一些东西,0 10 1 * *我应该安排它 0 10 30 * *?换句话说,如果我想在每个月的第一天10点运行一些东西,我应该把它安排到每个月的最后一天10点?

那逻辑在哪里?这很难理解和遵循.

它变得最糟糕,根据这个没有办法告诉调度程序这个输入.我是什么做的?!

airflow

6
推荐指数
1
解决办法
3629
查看次数

损坏的DAG:[/ airflow / dags / a.py]无法为登录名解密`extra`参数=无,缺少FERNET_KEY配置

我在没有FERNET_KEY的情况下启动了Airflow。意识到这一点后,我将执行以下操作:https : //airflow.apache.org/configuration.html#connections

pip install apache-airflow[crypto]

from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key)
Run Code Online (Sandbox Code Playgroud)

拿了钥匙并将其放入airflow.cfg然后调用airflow initdb,但是错误仍然出现。

我究竟做错了什么?

当我做:

airflow webserver -D
Run Code Online (Sandbox Code Playgroud)

我得到:

  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 713, in extra_dejson
    if self.extra:
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/attributes.py", line 293, in __get__
    return self.descriptor.__get__(instance, owner)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 632, in get_extra
    return fernet.decrypt(bytes(self._extra, 'utf-8')).decode()
  File "/usr/lib/python2.7/dist-packages/cryptography/fernet.py", line 101, in decrypt
    raise InvalidToken
Run Code Online (Sandbox Code Playgroud)

日志表明此代码存在问题:

def get_conn(conn_id, session=None):
    conn = (session.query(Connection)
                   .filter(Connection.conn_id == conn_id)
                   .first())
    return conn


def my_python_function():
   conn = get_conn('s3connection')
   key_id = …
Run Code Online (Sandbox Code Playgroud)

python airflow

5
推荐指数
3
解决办法
4223
查看次数

如何在 Python 中列出 Google Storage 中的存储桶?

我在 Python 脚本中有以下代码:

from google.oauth2 import service_account
SCOPES = ['https://www.googleapis.com/auth/sqlservice.admin']
SERVICE_ACCOUNT_FILE = 'JSON_AUTH_FILE'
credentials = service_account.Credentials.from_service_account_file(
            SERVICE_ACCOUNT_FILE, scopes=SCOPES)
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = SERVICE_ACCOUNT_FILE
Run Code Online (Sandbox Code Playgroud)

这可以通过谷歌进行身份验证。

现在我想列出所有存储桶:

from google.cloud import storage
storage_client = storage.Client()
buckets = list(storage_client.list_buckets())
print(buckets)
Run Code Online (Sandbox Code Playgroud)

但这行不通。我得到:

google.api_core.exceptions.Forbidden: 403

xxx@yyy.iam.gserviceaccount.com does not have storage.buckets.list access to project
Run Code Online (Sandbox Code Playgroud)

当我点击它时,我看到它还有一个链接(这很奇怪,因为它说403,但这里显示401:

{
 "error": {
  "errors": [
   {
    "domain": "global",
    "reason": "required",
    "message": "Anonymous caller does not have storage.buckets.list access to project NUMBER.",
    "locationType": "header",
    "location": "Authorization"
   }
  ],
  "code": 401,
  "message": "Anonymous caller …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-platform

4
推荐指数
1
解决办法
1万
查看次数

apache airflow为DAG文件夹中的所有脚本定义数据

我有像Amazon S3连接设置等数据..这与我的DAG文件夹中的许多.py文件相关.

我知道default_args在特定的.py文件中与所有DAGS共享我的问题是我如何定义可以访问DAG文件夹中所有 .py文件的数据?

想象一下,我需要更改我的S3连接细节...我不想翻阅所有文件并逐个更改.我想在一个地方改变它.

Airflow是否支持此功能?

airflow

3
推荐指数
1
解决办法
263
查看次数

Airflow 中的执行顺序

我正在这里查看示例代码

有两个“操作”功能:

def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)
Run Code Online (Sandbox Code Playgroud)

和:

def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'
Run Code Online (Sandbox Code Playgroud)

对于my_sleeping_function我们运行的每次运行print_context

我不明白的是顺序。这是图形和树..执行顺序不一样:

在此处输入图片说明 在此处输入图片说明

首先会发生什么?之后会发生什么?为什么?

我假设根据这个:

for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag)

    task.set_upstream(run_this)
Run Code Online (Sandbox Code Playgroud)

run_this 执行然后任务执行但循环让我感到困惑。

python airflow

2
推荐指数
1
解决办法
3935
查看次数

Apache AIRFLOW - 如何将参数发送到Python脚本

Admin->Connection我设置Conn Type S3.

基本上我在My Python脚本中有这个代码:

if __name__ == '__main__':
    AWS_ACCESS_KEY_ID = "..."
    AWS_SECRET_ACCESS_KEY = "..."
    AWS_DEFAULT_REGION = "..."
    Start_Work
Run Code Online (Sandbox Code Playgroud)

我想要做的是从Airflow调用我的脚本并将连接的参数传递给它(而不是在脚本中硬编码).

我怎么做?

编辑:让我们假设这是连接: 在此输入图像描述

如何访问每个提交的数据?

python airflow

2
推荐指数
1
解决办法
1281
查看次数

如何防止追赶DAG?

我有以下DAG:

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 07, 19, 11,0,0),
    'email': ['me@me.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=2),
    'catchup' : False,
    'depends_on_past' : False,
}


with DAG('some_dag', schedule_interval=timedelta(minutes=30), max_active_runs=1, default_args=default_args) as dag:
Run Code Online (Sandbox Code Playgroud)

该dag每30分钟运行一次。它重写表中的数据(全部删除并写入)。因此,如果气流中断了2天,那么在此期间运行所有丢失的dag运行就没有意义了。

但是,以上定义不起作用。气流停止两天后,它仍然尝试运行所有丢失的任务。

我该如何解决?

airflow

1
推荐指数
2
解决办法
782
查看次数

将 Python 文件转换为 Airflow DAG

我有这个 Python 文件:

class Get:

    def __init__(self, i):
        self.i = get_date(i)
        self.df = self.get_file()

    def get_file(self):
        try:
            ...
            return df
        except Exception as e:
            return ...

    def get_date(self,i):
        dt = datetime.now() - timedelta(days=i)
        return dt.strftime("%Y-%m-%d")

    def put(self,df):
        ....


class Fix:
    def __init__(self,df):
        ....

if __name__ == '__main__':
    for i in range(4, 0, -1):
        get = Get(i)
        fix = Fix(get.df)
        get.put(fix.df)
Run Code Online (Sandbox Code Playgroud)

基本上这段代码生成 4 个最后的日期并在这些日期上运行函数(更新统计信息等...)

起初我想将每个函数转换为 PythonOperator 然后安排它,但我认为这行不通。我不知道如何转换 Classes 和它们之间传输的参数。

如果我在 2018 年 6 月 12 日及以下运行它,代码会执行以下操作: 在此处输入图片说明

是否有我可以使用的模板或任何建议?

python airflow

0
推荐指数
1
解决办法
2001
查看次数

如何编写与Amazon Redshift连接的DAG?

假设我想编写一个DAG来显示Redshift特定模式中的所有表.该SQL查询Show Tables;

如何为它创建DAG?我认为这应该是这样的:

dag = airflow.DAG(
    'process_dimensions',
    schedule_interval="@daily",
    dagrun_timeout=timedelta(minutes=60),
    default_args=args,
    max_active_runs=1)

process_product_dim = SQLOperator(
    task_id='process_product_dim',
    conn_id='??????',
    sql='Show Tables',
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

有谁知道如何正确写它?

airflow

0
推荐指数
1
解决办法
2363
查看次数