Ger*_*lan 2 python postgresql etl amazon-s3 amazon-rds
我正在尝试使用 Python 将大量 JSON 文件从 Amazon S3 导入到 AWS RDS-PostgreSQL 中。但是,这些错误发生了,
回溯(最近一次调用最后一次):
文件“my_code.py”,第 67 行,位于
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
文件“/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/response.py”,第 76 行,读取
块= self._raw_stream.read(amt)
文件“/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/response.py”,第239行,在读
数据 = self._fp.read()
文件“/usr/lib64/python3.6/http/client.py”,第 462 行,读取
s = self._safe_read(self.length)
文件“/usr/lib64/python3.6/http/client.py”,第 617 行,在 _safe_read 中
返回 b"".join(s)
内存错误
// my_code.py
import sys
import boto3
import psycopg2
import zipfile
import io
import json
s3 = boto3.client('s3', aws_access_key_id=<aws_access_key_id>, aws_secret_access_key=<aws_secret_access_key>)
connection = psycopg2.connect(host=<host>, dbname=<dbname>, user=<user>, password=<password>)
cursor = connection.cursor()
bucket = sys.argv[1]
key = sys.argv[2]
obj = s3.get_object(Bucket=bucket, Key=key)
def insert_query(data):
query = """
INSERT INTO data_table
SELECT
(src.test->>'url')::varchar, (src.test->>'id')::bigint,
(src.test->>'external_id')::bigint, (src.test->>'via')::jsonb
FROM (SELECT CAST(%s AS JSONB) AS test) src
"""
cursor.execute(query, (json.dumps(data),))
if key.endswith('.zip'):
zip_files = obj['Body'].read()
with io.BytesIO(zip_files) as zf:
zf.seek(0)
with zipfile.ZipFile(zf, mode='r') as z:
for filename in z.namelist():
with z.open(filename) as f:
for line in f:
insert_query(json.loads(line.decode('utf-8')))
if key.endswith('.json'):
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:
insert_query(json.loads(line))
connection.commit()
connection.close()
Run Code Online (Sandbox Code Playgroud)
这些问题有解决办法吗?任何帮助都可以,非常感谢!
通过避免将整个输入文件作为一行放入内存中,可以节省大量费用list。
具体来说,这些行对内存使用量来说非常糟糕,因为它们涉及bytes整个文件大小的对象的峰值内存使用量,以及list包含文件完整内容的行:
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:
Run Code Online (Sandbox Code Playgroud)
对于包含 500 万行的 1 GB ASCII 文本文件,在 64 位 Python 3.3+ 上,仅对象bytes、和中的list单个s的峰值内存需求约为 2.3 GB 。需要 2.3 倍 RAM 为其处理文件大小的程序无法扩展到大文件。strlist
要修复此问题,请将原始代码更改为:
file_content = io.TextIOWrapper(obj['Body'], encoding='utf-8')
for line in file_content:
Run Code Online (Sandbox Code Playgroud)
鉴于这obj['Body']似乎可用于延迟流,这应该从内存中删除完整文件数据的两个副本。使用TextIOWrapper方法obj['Body']会以块(一次几KB)的形式延迟读取和解码,并且行也会延迟迭代;这将内存需求减少到一个小的、基本固定的量(峰值内存成本将取决于最长行的长度),无论文件大小如何。
更新:
看来StreamingBody没有实现io.BufferedIOBaseABC。不过,它确实有自己记录的 API,可用于类似的目的。如果您无法让其TextIOWrapper为您完成工作(如果可以使其工作,则会更加高效和简单),另一种选择是:
file_content = (line.decode('utf-8') for line in obj['Body'].iter_lines())
for line in file_content:
Run Code Online (Sandbox Code Playgroud)
与 using 不同TextIOWrapper,它不会受益于块的批量解码(每行单独解码),但在减少内存使用方面它仍然应该实现相同的好处。
| 归档时间: |
|
| 查看次数: |
5189 次 |
| 最近记录: |