如何直接将查询结果写入Google云端存储桶?

bob*_*ary 3 python google-cloud-storage google-bigquery google-cloud-platform

from google.cloud import bigquery  
query = """ select * from emp where emp_name=@emp_name""" 
query_params = [bigquery.ScalarQueryParameter('emp_name', 'STRING', 'name')] 
job_config = bigquery.QueryJobConfig() 
job_config.query_parameters = query_params  
client = bigquery.Client() 
query_job = client.query(query, job_config=job_config) 
result = query_job.result()
Run Code Online (Sandbox Code Playgroud)

如何将结果写入Google云端存储,而不是将其写入CSV并将其上传到云存储桶?

Gra*_*ley 5

BigQuery不支持将其查询结果直接写入GCS.您必须将结果写入表中,然后在表格实现后将表导出到GCS.您可以使用Cloud Composer为您编排此功能.

或者,您可以使用Dataflow管道一次性实现所需的结果.但这是一项更多的工作,将花费更多的钱.我们的想法是使用您的SQL查询编写一个从BigQuery读取的管道,然后将结果写入GCS.它也会慢一些.


dse*_*sto 5

根据您的特定用例(出口的频率,出口的大小等),@ GrahamPolley的答案中提出的解决方案可能对您有用,尽管它们需要更多的开发和关注。

当前写入查询结果的可能性是将结果写入表或在本地下载,甚至直接下载到CSV也有一些限制。因此,不可能将查询结果直接以CSV格式写入GCS。但是,有两个步骤的解决方案,其中包括:

  1. 将查询结果写入BQ表
  2. 将数据从BQ表导出到GCS中的CSV文件。请注意,此功能也有一些限制,但并没有那么狭窄。

以下Python代码可以使您了解如何执行该任务:

from google.cloud import bigquery
client = bigquery.Client()

# Write query results to a new table
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("DATASET").table("TABLE")
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

query_job = client.query(
    'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10',
    location='US', # Location must match dataset
    job_config=job_config)
rows = list(query_job)  # Waits for the query to finish


# Export table to GCS
destination_uri = "gs://BUCKET/FILE.CSV"
dataset_ref = client.dataset("DATASET", project="PROJECT_ID")
table_ref = dataset_ref.table("TABLE")

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location='US')
extract_job.result()  # Waits for job to complete
Run Code Online (Sandbox Code Playgroud)

请注意,在那之后,您将必须删除该表(也可以通过编程方式执行此操作)。如果您必须使流程自动化(如果这是您的用例,也许您应该更好地使用@Graham的解决方案),这可能不是最佳解决方案,但是它可以解决一个简单的问题。


Por*_*Kev 5

@dsesto 的回答对我很有用。我使用他的代码并添加了一些额外的行来查询 BigQuery,将结果写入表,然后导出到 GCS 并将结果导入到 Dask DataFrame。代码被包装成一个函数。

def df_from_bq(query:str,table=None,compute=False):

from time import gmtime, strftime
from google.cloud import bigquery#y, storage 
import dask.dataframe as dd
import gcsfs

client = bigquery.Client.from_service_account_json('YOUR_PATH') #Authentication if BQ using ServiceKey
project = 'YOUR_PROJECT'

table_name = 'result_'+str(strftime("%Y%m%d_%H%M%S", gmtime())) if table==None else table #Creates custome table name if no name is defined

job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("YOUR_DATASET").table(table_name)
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE #Creates the table with query result. Overwrites it if the table exists

query_job = client.query(
    query,
    location='US', 
    job_config=job_config)
query_job.result() 
print('Query results loaded to table {}'.format(table_ref.path))

destination_uri = "gs://YOUR_BUCKET/{}".format(table_name+'_*'+'.csv') 
dataset_ref = client.dataset("YOUR_DATASET", project=project)
table_ref = dataset_ref.table(table_name)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location='US') 
extract_job.result() #Extracts results to the GCS

print('Query results extracted to GCS: {}'.format(destination_uri))

client.delete_table(table_ref) #Deletes table in BQ

print('Table {} deleted'.format(table_name))

gcs = gcsfs.GCSFileSystem(project=project, token='cache') 
df = dd.read_csv('gcs://YOUR_BUCKET/{}'.format(table_name+'_*'+'.csv'),  storage_options={'token': gcs.session.credentials})

#storage_client = storage.Client.from_service_account_json('C:\\Users\o.korshun\Documents\o.korshun.json')
#bucket = storage_client.get_bucket('plarium-analytics')
#blob = bucket.blob(table_name+'.csv')
#blob.delete() #Uncomment if you need to delete Blob after the DataFrame is created

#print('Blob {} deleted'.format(table_name+'.csv'))
print('Results imported to DD!')

return df if compute == False else df.compute().reset_index(in_place=True)
Run Code Online (Sandbox Code Playgroud)

需要注意的是,BQ 中的 Table 会在结果导入到 Cloud Storage 后被删除。