Ant*_*rps 5 python async-await google-bigquery
这可能是一个虚拟的问题,但我似乎无法异步运行python google-clood-bigquery。
我的目标是同时运行多个查询,并在asyncio.wait()
查询收集器中等待所有查询完成。我正在asyncio.create_tast()
用来启动查询。问题是每个查询在开始之前都等待先例的完成。
这是我的查询功能(非常简单):
async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
job = self.api.query(query, **kwargs)
return job.result()
Run Code Online (Sandbox Code Playgroud)
既然我不能等待job.result()
,我应该等待其他东西吗?
Wil*_*uks 10
如果您正在a内部进行操作,coroutine
并且希望在不阻塞的情况下运行其他查询,event_loop
则可以使用该run_in_executor
函数,该函数基本上在后台线程中运行查询而不会阻塞循环。这是一个很好的例子。
确保这正是您所需要的;为在Python API中运行查询而创建的作业已经是异步的,并且仅在您调用时阻塞job.result()
。这意味着asyncio
除非您位于协程内部,否则无需使用。
这是一个在完成工作后立即检索结果的快速示例:
from concurrent.futures import ThreadPoolExecutor, as_completed
import google.cloud.bigquery as bq
client = bq.Client.from_service_account_json('path/to/key.json')
query1 = 'SELECT 1'
query2 = 'SELECT 2'
threads = []
results = []
executor = ThreadPoolExecutor(5)
for job in [client.query(query1), client.query(query2)]:
threads.append(executor.submit(job.result))
# Here you can run any code you like. The interpreter is free
for future in as_completed(threads):
results.append(list(future.result()))
Run Code Online (Sandbox Code Playgroud)
results
将会:
[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]
Run Code Online (Sandbox Code Playgroud)
只是分享一个不同的解决方案:
import numpy as np
from time import sleep
query1 = """
SELECT
language.name,
average(language.bytes)
FROM `bigquery-public-data.github_repos.languages`
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'
def dummy_callback(future):
global jobs_done
jobs_done[future.job_id] = True
jobs = [bq.query(query1), bq.query(query2)]
jobs_done = {job.job_id: False for job in jobs}
[job.add_done_callback(dummy_callback) for job in jobs]
# blocking loop to wait for jobs to finish
while not (np.all(list(jobs_done.values()))):
print('waiting for jobs to finish ... sleeping for 1s')
sleep(1)
print('all jobs done, do your stuff')
Run Code Online (Sandbox Code Playgroud)
我更as_completed
喜欢使用 bigquery 作业本身的内置异步功能,而不是使用。这也使我可以将数据管道分解为单独的云功能,而不必在ThreadPoolExecutor
整个管道的持续时间内保持主要功能。顺便说一句,这就是我研究这个问题的原因:我的管道比 Cloud Functions 的最大超时 9 分钟(甚至 Cloud Run 的 15 分钟)更长。
缺点是我需要跟踪job_id
各个函数中的所有 s,但是在通过指定输入和输出来配置管道时相对容易解决,以便它们形成有向无环图。
归档时间: |
|
查看次数: |
1421 次 |
最近记录: |