我想使用他们在此处描述的技术从 Flask 流式传输 CSV :
from flask import Response
@app.route('/large.csv')
def generate_large_csv():
def generate():
for row in iter_all_rows():
yield ','.join(row) + '\n'
return Response(generate(), mimetype='text/csv')
Run Code Online (Sandbox Code Playgroud)
我有一个来自 sqlalchemy 的查询,它返回一个 Purchase 对象列表。我想将其写入 CSV,理想情况下可以自定义输出到文件的属性。
不幸的是,我目前得到一个空白的 CSV 作为输出:
@app.route('/sales_export.csv')
@login_required
def sales_export():
""" Export a CSV of all sales data """
def generate():
count = 0
fieldnames = [
'uuid',
'recipient_name',
'recipient_email',
'shipping_street_address_1',
'shipping_street_address_2',
'shipping_city',
'shipping_state',
'shipping_zip',
'purchaser_name',
'purchaser_email',
'personal_message',
'sold_at'
]
for i, row in enumerate(Purchase.query.all()):
if i == 0:
yield fieldnames
csv = ','.join(row) + '\n'
yield csv
return Response(generate(), mimetype='text/csv')
Run Code Online (Sandbox Code Playgroud)
我哪里错了?
这是我解决这个问题的方法。它没有使用我最初好奇的流媒体功能。我仍然对使用该技术的解决方案感兴趣。
@app.route('/download')
@login_required
def download():
"Export a CSV of all sales data"
purchases = Purchase.query.all()
csvfile = cStringIO.StringIO()
headers = [
'uuid',
'recipient_name',
'recipient_email',
'shipping_street_address_1',
'shipping_street_address_2',
'shipping_city',
'shipping_state',
'shipping_zip',
'purchaser_name',
'purchaser_email',
'personal_message',
'sold_at',
'coupon_used'
]
rows = []
for purchase in Purchase.query.all():
rows.append(
{
'uuid': purchase.uuid,
'recipient_name': purchase.recipient_name,
'recipient_email': purchase.recipient_email,
'shipping_street_address_1': purchase.shipping_street_address_1,
'shipping_street_address_2': purchase.shipping_street_address_2,
'shipping_city': purchase.shipping_city,
'shipping_state': purchase.shipping_state,
'shipping_zip': purchase.shipping_zip,
'purchaser_name': purchase.purchaser_name,
'purchaser_email': purchase.purchaser_email,
'personal_message': purchase.personal_message,
'sold_at': purchase.sold_at.strftime('%c'),
'coupon_used': purchase.coupon_used
}
)
writer = csv.DictWriter(csvfile, headers)
writer.writeheader()
for row in rows:
writer.writerow(
dict(
(k, v.encode('utf-8') if type(v) is unicode else v) for k, v in row.iteritems()
)
)
csvfile.seek(0)
return send_file(csvfile, attachment_filename='sales_export.csv', as_attachment=True)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3464 次 |
| 最近记录: |