根据AWS Athena 限制,您一次最多可以提交 20 个相同类型的查询,但这是一个软限制,可以根据要求增加。我曾经boto3
与 Athena 进行交互,我的脚本提交了 16 个 CTAS 查询,每个查询大约需要 2 分钟才能完成。在 AWS 账户中,只有我在使用 Athena 服务。但是,当我通过控制台查看查询状态时,我发现尽管所有查询都处于 state 状态,但实际上只有少数查询(平均 5 个)正在执行Running
。以下是通常会在 Athena 历史选项卡中看到的内容:
我了解,在我向 Athena 提交查询后,它会根据整体服务负载和传入请求的数量分配资源来处理查询。但是我尝试在不同的日期和时间运行它们,仍然会同时执行大约 5 个查询。
所以我的问题是它应该如何?如果是这样,那么如果其中大约 15 个查询处于空闲状态并等待可用插槽,那么能够提交多达 20 个查询又有什么意义呢?
刚刚在 presto 文档中偶然发现了 HIVE CONNECTOR,其中有一节AWS Glue Catalog Configuration Properties。在那里我们可以看到
hive.metastore.glue.max-connections
:到 Glue 的最大并发连接数(默认为 5)。
这让我想知道它是否与我的问题有关。据我了解,Athena 只是一个在 EMR 集群上运行的 Presto,该集群配置为使用 AWS Glue 数据目录作为 Metastore。
那么,如果我的问题来自这样一个事实,即 Athena 的 EMR 集群只是使用默认值来连接到 Glue 的并发连接,即 5,这正是在我的情况下实际执行(平均)并发查询的数量。
Athena 团队最近为 Athena 部署了许多新功能。虽然 …
我正在尝试DockerOperator
在 Mac 上使用 Airflow。我正在运行基于Puckel 的Airflow,并做了一些小的修改。
Dockerfile 构建为 puckle-airflow-with-docker-inside:
FROM puckel/docker-airflow:latest
USER root
RUN groupadd --gid 999 docker \
&& usermod -aG docker airflow
USER airflow
Run Code Online (Sandbox Code Playgroud)
docker-compose-CeleryExecutor.yml.:
FROM puckel/docker-airflow:latest
USER root
RUN groupadd --gid 999 docker \
&& usermod -aG docker airflow
USER airflow
Run Code Online (Sandbox Code Playgroud)
DAG 中的任务/操作定义:
version: '2.1'
services:
redis:
image: 'redis:5.0.5'
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
webserver:
image: puckel-airflow-with-docker-inside:latest
restart: always
depends_on:
- postgres
- redis
environment:
- LOAD_EX=n …
Run Code Online (Sandbox Code Playgroud) 我知道如何通过伪列获取文件的路径,例如:
select "$path" from table
Run Code Online (Sandbox Code Playgroud)
但我也想把文件大小放在旁边,可以吗?提前致谢。
我使用airflow python 运算符对 redshift/postgres 数据库执行 sql 查询。为了进行调试,我希望 DAG 返回 sql 执行的结果,类似于在控制台本地执行时看到的结果:
我用来psycop2
创建连接/游标并执行 sql。记录下来对于确认解析的参数化 SQL 以及确认数据确实已插入非常有帮助(我曾经痛苦地经历过环境差异导致意外行为的问题)
我对气流或 python DBAPI 的低级工作原理没有深入的了解,但文档pscyopg2
似乎确实提到了一些可能允许这样做的方法和连接配置。
我发现非常令人困惑的是,这很难做到,因为我想象这将是在此平台上运行 ETL 的主要用例。我听说过简单地创建额外的任务来查询表之前和之后的建议,但这似乎笨拙且无效。
谁能解释一下这如何可能,如果不可能,请解释为什么?欢迎实现类似结果的替代方法。谢谢!
到目前为止我已经尝试过该connection.status_message()
方法,但它似乎只返回sql的第一行而不是结果。我还尝试创建一个日志游标,它生成 sql,但不生成控制台结果
import logging
import psycopg2 as pg
from psycopg2.extras import LoggingConnection
conn = pg.connect(
connection_factory=LoggingConnection,
...
)
conn.autocommit = True
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler(sys.stdout))
conn.initialize(logger)
cur = conn.cursor()
sql = """
INSERT INTO mytable (
SELECT *
FROM other_table
);
"""
cur.execute(sql)
Run Code Online (Sandbox Code Playgroud)
我希望记录器返回类似以下内容:
sql> INSERT INTO mytable (
SELECT …
Run Code Online (Sandbox Code Playgroud) 我得到了一个非常简单的查询,当在同一硬件上运行Spark SQL和Presto时(3小时vs 3分钟),显示出显着的性能差异。
SELECT field
FROM test1
WHERE field NOT IN (SELECT field FROM test2)
Run Code Online (Sandbox Code Playgroud)
经过对查询计划的研究,我发现原因是Spark SQL如何处理NOT IN
谓词子查询。为了正确处理NOT IN的NULL,Spark SQL将NOT IN
谓词转换为Left AntiJoin( (test1=test2) OR isNULL(test1=test2))
。
Spark SQL引入OR isNULL(test1=test2)
了确保的正确语义NOT IN
。
但是,OR
Left AntiJoin连接谓词的唯一可行的物理连接策略Left AntiJoin
是BroadcastNestedLoopJoin
。在当前阶段,我可以将NOT IN改写为NOT EXISTS来解决此问题。在NOT EXISTS的查询计划中,我可以看到join谓词是Left AntiJoin(test1=test2)
为NOT EXISTS(5分钟完成)导致更好的物理联接运算符的原因。
到目前为止,我很幸运,因为我的数据集当前没有任何NULL
属性,但是将来可能会具有,而NOT IN的语义正是我真正想要的。
所以我检查了Presto的查询计划,它没有真正提供,Left AntiJoin
但SemiJoin
与一起使用FilterPredicate = not (expr)
。Presto的查询计划没有提供太多信息,例如Spark。
所以我的问题更像是:
我可以假设Presto有更好的物理联接运算符来处理NOT IN
操作吗?与Spark SQL不同,它不依赖于连接谓词的重写isnull(op1 = op2)
来确保逻辑计划级别中NOT …
我正在学习气流,我必须创建一个将连接到远程 oracle 数据库并将数据从一个表插入到另一个表的 dag。以下是我到目前为止所做的
pip install 'apache-airflow[oracle]'
以便我可以使用oracle operator在我尝试找出它没有运行的原因之前,我想确定是否必须执行一些其他步骤,我可能会跳过一些其他步骤,例如airflow.cfg
文件中的某些更改,尤其是在sql_alchemy_conn
(我在主目录中找到的这个文件)中,或者是否cx_oracle
需要安装以便我可以连接到远程数据库或只需要连接字符串(上面的第 2 步)。
提到了很多关于气流的事情,但还不清楚确切的步骤。如果有人能在这方面帮助我,那将不胜感激。这些可能是基础知识,但我一直坚持到底要做什么。请帮忙
谢谢!!
我在AWS Athena中有一个表,该表的列名为“ servertime”,数据类型为timestamp。我运行这样的查询
select *
from table_name
where servertime between '2018-04-01 00:00:00' and '2018-04-05 23:59:59';
Run Code Online (Sandbox Code Playgroud)
它给了我这个错误: 您的查询具有以下错误:SYNTAX_ERROR:行1:41:'='不能应用于时间戳,varchar(19)
我该如何在雅典娜解决这个问题?从表中获取数据是重要的查询。
我无法弄清楚我的 CTAS 查询出了什么问题,即使我没有提到任何分桶列,它也会在存储在分区内时将数据分解成更小的文件。有没有办法避免这些小文件并将每个分区存储为一个文件,因为小于 128 MB 的文件会导致额外的开销?
CREATE TABLE sampledb.yellow_trip_data_parquet
WITH(
format = 'PARQUET'
parquet_compression = 'GZIP',
external_location='s3://mybucket/Athena/tables/parquet/'
partitioned_by=ARRAY['year','month']
)
AS SELECT
VendorID,
tpep_pickup_datetime,
tpep_dropoff_datetime,
passenger_count,
trip_distance,
RatecodeID,
store_and_fwd_flag,
PULocationID,
DOLocationID,
payment_type,
fare_amount,
extra,
mta_tax,
tip_amount,
tolls_amount,
improvement_surcharge,
total_amount,
date_format(date_parse(tpep_pickup_datetime,'%Y-%c-%d %k:%i:%s'),'%Y') AS year,
date_format(date_parse(tpep_pickup_datetime,'%Y-%c-%d %k:%i:%s'),'%c') AS month
FROM sampleDB.yellow_trip_data_raw;
Run Code Online (Sandbox Code Playgroud)
根据上一篇文章,我有一个 AWS Glue Pythonshell 作业,需要从通过 boto3 调用传递给它的参数中检索一些信息。
我的胶水工作名称是test_metrics
Glue pythonshell 代码如下所示
import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv,
['test_metrics',
's3_target_path_key',
's3_target_path_value'])
print ("Target path key is: ", args['s3_target_path_key'])
print ("Target Path value is: ", args['s3_target_path_value'])
Run Code Online (Sandbox Code Playgroud)
调用此作业的 boto3 代码如下:
glue = boto3.client('glue')
response = glue.start_job_run(
JobName = 'test_metrics',
Arguments = {
'--s3_target_path_key': 's3://my_target',
'--s3_target_path_value': 's3://my_target_value'
}
)
print(response)
Run Code Online (Sandbox Code Playgroud)
200
在本地计算机中运行 boto3 代码后,我看到了响应,但 Glue 错误日志告诉我:
test_metrics.py: error: the following arguments are required: --test_metrics
Run Code Online (Sandbox Code Playgroud)
我缺少什么?
我在 Athena 中有一个非常基本的 group by query,我想在其中使用别名。可以通过在 group by 中放置相同的引用来使示例工作,但是当存在复杂的列修改并且需要在两个地方复制逻辑时,这并不是很方便。我过去也这样做过,现在我有一个通过复制不起作用的声明。
问题:
SELECT
substr(accountDescriptor, 5) as account,
sum(revenue) as grossRevenue
FROM sales
GROUP BY account
Run Code Online (Sandbox Code Playgroud)
这将引发错误:
别名列“帐户”无法解析
以下工作,所以它是关于别名处理。
SELECT
substr(accountDescriptor, 5) as account,
sum(revenue) as grossRevenue
FROM sales
GROUP BY substr(accountDescriptor, 5)
Run Code Online (Sandbox Code Playgroud)