Chr*_*ock 5 python csv stream azure
StorageStreamDownloader
使用 azure.storage.blob 包很简单:
from azure.storage.blob import BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string("my azure connection string")
container_client = blob_service_client.get_container_client("my azure container name")
blob_client = container_client.get_blob_client("my azure file name")
storage_stream_downloader = blob_client.download_blob()
Run Code Online (Sandbox Code Playgroud)
处理类似文件的对象很简单,或者更具体地说,我认为,处理 csv 包中的字符串返回迭代器(或对象的文件路径):
import csv
from io import StringIO
csv_string = """col1, col2
a,b
c,d"""
with StringIO(csv_string) as csv_file:
for row in csv.reader(csv_file):
print(row) # or rather whatever I actually want to do on a row by row basis, e.g. ascertain that the file contains a row that meets a certain condition
Run Code Online (Sandbox Code Playgroud)
我正在努力解决的是从我的流数据StorageStreamDownloader
中获取流数据csv.reader()
,以便我可以在每一行到达时对其进行处理,而不是等待整个文件下载。
微软文档让我觉得有点符合他们的标准(该chunks()
方法没有注释?),但我看到有一种readinto()
读取流的方法。我尝试读入流,但无法弄清楚如何在不将缓冲区输出到新文件并读取该文件的情况下BytesIO
将数据取出。这一切都让我觉得这是一件应该可行的事情,但我可能在概念上遗漏了一些明显的东西,也许与orcsv.reader()
有关,或者也许我只是使用了错误的 csv 工具来满足我的需求?itertools
asyncio
基于 Jim Xu 的评论:
stream = blob_client.download_blob()
with io.BytesIO() as buf:
stream.readinto(buf)
# needed to reset the buffer, otherwise, panda won't read from the start
buf.seek(0)
data = pd.read_csv(buf)
Run Code Online (Sandbox Code Playgroud)
或者
csv_content = blob_client.download_blob().readall()
data = pd.read_csv(io.BytesIO(csv_content ))
Run Code Online (Sandbox Code Playgroud)
如果你想逐行读取csv文件,可以使用方法pd.read_csv(filename, chunksize=1)
。欲了解更多详情,请参阅这里和这里
例如(我使用pandas1.2.1)
with pd.read_csv(content, chunksize=1) as reader:
for chunk in reader:
print(chunk)
print('---------------')
Run Code Online (Sandbox Code Playgroud)
另外,如果要使用该方法chunks()
,我们需要在创建时将max_chunk_get_size
和设为相同的值。欲了解更多详情,请参阅这里和这里max_single_get_size
BlobClient
例如
from azure.storage.blob import BlobClient
key = '<account_key>'
blob_client = BlobClient(account_url='https://andyprivate.blob.core.windows.net',
container_name='input',
blob_name='cities.csv',
credential=key,
max_chunk_get_size=1024,
max_single_get_size=1024)
stream = blob_client.download_blob()
for chunk in stream.chunks():
print(len(chunk))
Run Code Online (Sandbox Code Playgroud)