我正在尝试将从 json 响应创建的 df 摄取到现有表中(该表当前为空,因为我似乎无法让它工作)
df 类似于下表:
| 指数 | clicks_affiliate |
|---|---|
| 0 | 3214 |
| 1 | 2221 |
但我看到以下错误:
Snowflake.connector.errors.ProgrammingError:000904(42000):SQL编译错误:位置94处的错误行1无效标识符“clicks_affiliated”
雪花中的列名称与我的数据框中的列匹配。
这是我的代码:
import pandas as pd
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas, pd_writer
from pandas import json_normalize
import requests
df_norm = json_normalize(json_response, 'reports')
#I've tried also adding the below line (and removing it) but I see the same error
df = df_norm.reset_index(drop=True)
def create_db_engine(db_name, schema_name):
engine = URL(
account="ab12345.us-west-2",
user="my_user",
password="my_pw",
database="DB",
schema="PUBLIC",
warehouse="WH1",
role="DEV"
)
return …Run Code Online (Sandbox Code Playgroud) 我正在尝试在 AWS 中创建一个连接到 Snowflake 数据库的 lambda 函数。为此,我需要该snowflake-connector-python[pandas]包(https://docs.snowflake.com/en/user-guide/python-connector-pandas.html),该包及其所有依赖项的未压缩大小超过 250 MB(大约 280 MB)。这是一个问题,因为 AWS lambda 允许最大 250 MB 的依赖项(使用 AWS 层)。
包的大小相当令人惊讶,从依赖关系来看,最大的违规者是 pyarrow(大约 80 MB)、pandas(大约 60 MB)和 numpy(大约 40 MB)。有没有办法减小整个包的大小,只安装相关部分,从而将大小减小到 250 MB 以下?也就是说,我需要能够连接、读取和写入 Snowflake,没什么花哨的。
我知道在这些情况下还有其他选择,例如容器,但是如果可能的话我想避免这种情况。
amazon-web-services aws-lambda snowflake-cloud-data-platform python-3.9 snowflake-connector
因此,我将 DAG 从 Airflow 版本 1.12.15 升级到 2.2.2,并将 python 从 3.8 降级到 3.7(因为 MWAA 不支持 python 3.8)。DAG 在之前的设置中工作正常,但在 MWAA 设置中显示此错误:
Broken DAG: [/usr/local/airflow/dags/google_analytics_import.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1474, in set_downstream
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1412, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'
Run Code Online (Sandbox Code Playgroud)
这是似乎失败的内置函数:
def set_downstream(
self,
task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
edge_modifier: Optional[EdgeModifier] = None,
) -> None:
"""
Set a task or a task list to be directly downstream from …Run Code Online (Sandbox Code Playgroud) 我在尝试安装 Snowflake python 连接器时经历了一段非常糟糕的时光。这是一个令人沮丧的事情,但我终于成功地使用完整的 Ubuntu 基础 Docker 镜像来安装它。但我现在不知道如何让 AWS lambda 包装器工作。
\n项目结构。
\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 Dockerfile\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 app\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 app.py\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 entry.sh\nRun Code Online (Sandbox Code Playgroud)\n入口.sh
\n#!/bin/sh\nif [ -z "${AWS_LAMBDA_RUNTIME_API}" ]; then\n\n exec /usr/bin/aws-lambda-rie /usr/local/bin/python3 -m awslambdaric $1\nelse\n exec /usr/local/bin/python3 -m awslambdaric $1\nfi\nRun Code Online (Sandbox Code Playgroud)\nDockerfile
\nARG FUNCTION_DIR="/home/app/"\n\nFROM ubuntu:20.04\nRUN apt-get update && apt-get install -y \\\n python3 \\\n python3-pip\nARG FUNCTION_DIR\nRUN mkdir -p ${FUNCTION_DIR}\nRUN pip install -U wheel pip --target ${FUNCTION_DIR}\nRUN pip install snowflake-connector-python==2.7.1 --target ${FUNCTION_DIR}\nRUN pip install awslambdaric --target ${FUNCTION_DIR}\nWORKDIR ${FUNCTION_DIR}\nADD …Run Code Online (Sandbox Code Playgroud) python-3.x docker aws-lambda aws-lambda-containers snowflake-connector
我看过其他问题,但似乎 Snowflake 不支持 sql 中的 if/else,至少不支持其他 sql 服务器支持它的方式。
有些人建议使用 javascript,但我想如果可以的话避免这样做。
我正在尝试使用 Snowflake python 库将一些数据插入表中,如果它已经存在,那么我想更新数据,我已经查看了合并,但它似乎不适合我,因为我的数据不是表
这就是我到目前为止不起作用的
f"""BEGIN
IF (EXISTS (SELECT * FROM {self.okr_table} WHERE kpi=TRUE AND Month(month)=MONTH(current_date()) AND year(month)=YEAR(current_date())))
THEN
UPDATE {self.okr_table} SET [DATA] = {json.dumps(self.data)} WHERE kpi=TRUE AND Month(month)=MONTH(current_date()) AND year(month)=YEAR(current_date()))
ELSE
INSERT INTO {self.okr_table} (month, data, kpi) SELECT current_date(),parse_json('{json.dumps(self.data)}'), true;
END"""
Run Code Online (Sandbox Code Playgroud) 我正在尝试将 pandas 数据帧加载到雪花表中。
这是我的 SF 表 DDL:
create or replace TABLE MY_TABLE (
WEBSITE VARCHAR(16777216),
ORGANIZATION_NAME VARCHAR(16777216),
ORGANIZATION_NAME_URL VARCHAR(16777216),
IPO_STATUS VARCHAR(16777216),
FOUNDED_DATE VARCHAR(16777216),
FOUNDED_DATE_PRECISION VARCHAR(16777216),
TOTAL_FUNDING_AMOUNT VARCHAR(16777216),
TOTAL_FUNDING_AMOUNT_CURRENCY VARCHAR(16777216),
TOTAL_FUNDING_AMOUNT_CURRENCY_USD VARCHAR(16777216),
LAST_FUNDING_AMOUNT VARCHAR(16777216),
LAST_FUNDING_AMOUNT_CURRENCY VARCHAR(16777216),
LAST_FUNDING_AMOUNT_CURRENCY_USD VARCHAR(16777216),
LAST_FUNDING_DATE VARCHAR(16777216),
INDUSTRIES VARCHAR(16777216),
INDUSTRY_GROUPS VARCHAR(16777216),
LAST_FUNDING_TYPE VARCHAR(16777216),
IT_SPEND VARCHAR(16777216),
IT_SPEND_CURRENCY VARCHAR(16777216),
IT_SPEND_CURRENCY_USD VARCHAR(16777216)
);
Run Code Online (Sandbox Code Playgroud)
这是我的数据框的结构:
Data columns (total 19 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 Website 3924 non-null object
1 Organization Name 3924 non-null object
2 Organization …Run Code Online (Sandbox Code Playgroud) python pandas snowflake-cloud-data-platform snowflake-connector
snowflake-cloud-data-platform ×4
aws-lambda ×2
pandas ×2
python ×2
airflow ×1
docker ×1
mwaa ×1
python-3.9 ×1
python-3.x ×1
sql ×1