BigQuery Storage API(https://googleapis.github.io/google-cloud-python/latest/bigquery_storage/gapic/v1beta1/api.html)对于从BigQuery表中读取数据几乎比标准BigQuery快10倍,非常有用API。为了使其更快,它支持多个读取流,每个读取流从相关表中读取动态分配的行集。
我的问题是:尽管您可能请求多个流,但是请求后分配的流不在您的控制范围内。因此,我无法启动1个以上的流。
我正在读取的数据包括3列和600万行,如下所示。我将创建到控制台的流总数打印出来。
from google.cloud import bigquery_storage_v1beta1
project_id = 'myproject'
client = bigquery_storage_v1beta1.BigQueryStorageClient()
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "usa_names"
table_ref.table_id = "usa_1910_current"
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("year")
read_options.selected_fields.append("name")
read_options.selected_fields.append("number")
# I request 3 streams to be created!
requested_streams = 3
parent = "projects/{}".format(project_id)
session = client.create_read_session(
table_ref, parent, table_modifiers=modifiers, read_options=read_options,
requested_streams=requested_streams
)
response = client.batch_create_read_session_streams(session, requested_streams)
# I see only 1 stream being created.
print("Streams created: " + str(len(session.streams)))
print("Stream names array: " + …
Run Code Online (Sandbox Code Playgroud) 我一直在进行测试,以比较Google BigQuery Python客户端库与Node JS库相比下载查询结果的速度。似乎,Python库开箱即用地下载数据的速度大约是Javascript Node JS客户端的两倍。为什么?
下面,我提供了两个测试,一个使用Python,一个使用Javascript。我选择了usa_names
BigQuery 的公共数据集作为示例。该usa_1910_current
数据集中的表大约有600万行180Mb
,大小大约为1。我有一个200Mb的光纤下载链接(有关最后一英里的信息)。将数据打包到pandas数据帧中后,大约为1.1Gb(包括Pandas开销)。
Python测试
from google.cloud import bigquery
import time
import pandas as pd
bq_client = bigquery.Client("mydata-1470162410749")
sql = """SELECT * FROM `bigquery-public-data.usa_names.usa_1910_current`"""
job_config = bigquery.QueryJobConfig()
start = time.time()
#---------------------------------------------------
query_job = bq_client.query(
sql,
location='US',
job_config=job_config)
#---------------------------------------------------
end = time.time()
query_time = end-start
start = time.time()
#---------------------------------------------------
rows = list(query_job.result(timeout=30))
df = pd.DataFrame(data=[list(x.values()) for x in rows], columns=list(rows[0].keys()))
#---------------------------------------------------
end = time.time()
iteration_time = end-start …
Run Code Online (Sandbox Code Playgroud)