Ray*_*oal 5 python psycopg2 python-requests amazon-redshift
我正在使用Python的requests库访问web服务,端点正在返回一个(非常大的)CSV文件,然后我想将其流式传输到数据库中.代码如下所示:
response = requests.get(url, auth=auth, stream=True)
if response.status_code == 200:
stream_csv_into_database(response)
Run Code Online (Sandbox Code Playgroud)
现在,当数据库是MongoDB数据库时,加载完美地使用DictReader:
def stream_csv_into_database(response):
.
.
.
for record in csv.DictReader(response.iter_lines(), delimiter='\t'):
product_count += 1
product = {k:v for (k,v) in record.iteritems() if v}
product['_id'] = product_count
collection.insert(product)
Run Code Online (Sandbox Code Playgroud)
但是,我正在从MongoDB切换到Amazon RedShift,我已经可以使用它进行访问了psycopg2.我可以打开连接并简单地进行简单查询,但我想要做的是使用来自webservice的流式响应并使用psycopg2 copy_expert来加载RedShift表.这是我到目前为止尝试的内容:
def stream_csv_into_database(response, campaign, config):
print 'Loading product feed for {0}'.format(campaign)
conn = new_redshift_connection(config) # My own helper, works fine.
table = 'products.' + campaign
cur = conn.cursor()
reader = response.iter_lines()
# Error on following line:
cur.copy_expert("COPY {0} FROM STDIN WITH CSV HEADER DELIMITER '\t'".format(table), reader)
conn.commit()
cur.close()
conn.close()
Run Code Online (Sandbox Code Playgroud)
我得到的错误是:
file必须是COPY FROM的可读文件对象; COPY TO的可写文件对象.
我明白错误在说什么; 事实上,我可以从psycopg2文档中看到copy_expert调用copy_from,其中:
从类似文件的对象读取数据,将它们附加到数据库表(COPY表FROM文件语法).源文件必须同时具有read()和readline()方法.
我的问题是我找不到使response对象成为类文件对象的方法!我都尝试.data和.iter_lines没有成功.我当然不想从webservice下载整个multi-gigabyte文件,然后将其上传到RedShift.必须有一种方法可以将流式响应用作psycopg2可以复制到RedShift中的类文件对象.谁知道我错过了什么?
您可以使用response.rawfile object,但要考虑到任何内容编码(例如 GZIP 或 Deflate 压缩)仍将存在,除非您在调用时将decode_content标志设置为,而 psycopg2 不会。True.read()
您可以在raw文件对象上设置标志以将默认值更改为 decompressing-while-reading:
response.raw.decode_content = True
Run Code Online (Sandbox Code Playgroud)
然后将response.raw文件对象用于csv.DictReader().
| 归档时间: |
|
| 查看次数: |
924 次 |
| 最近记录: |