Python-如何-大查询异步任务

Ant*_*rps 5 python async-await google-bigquery

这可能是一个虚拟的问题,但我似乎无法异步运行python google-clood-bigquery。

我的目标是同时运行多个查询,并在asyncio.wait()查询收集器中等待所有查询完成。我正在asyncio.create_tast()用来启动查询。问题是每个查询在开始之前都等待先例的完成。

这是我的查询功能(非常简单):

async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
  job = self.api.query(query, **kwargs)
  return job.result()
Run Code Online (Sandbox Code Playgroud)

既然我不能等待job.result(),我应该等待其他东西吗?

Wil*_*uks 10

如果您正在a内部进行操作,coroutine并且希望在不阻塞的情况下运行其他查询,event_loop则可以使用该run_in_executor函数,该函数基本上在后台线程中运行查询而不会阻塞循环。这是一个很好的例子。

确保这正是您所需要的;为在Python API中运行查询而创建的作业已经是异步的,并且仅在您调用时阻塞job.result()。这意味着asyncio除非您位于协程内部,否则无需使用。

这是一个在完成工作后立即检索结果的快速示例:

from concurrent.futures import ThreadPoolExecutor, as_completed
import google.cloud.bigquery as bq


client = bq.Client.from_service_account_json('path/to/key.json')
query1 = 'SELECT 1'
query2 = 'SELECT 2'

threads = []
results = []

executor = ThreadPoolExecutor(5)

for job in [client.query(query1), client.query(query2)]:
    threads.append(executor.submit(job.result))

# Here you can run any code you like. The interpreter is free

for future in as_completed(threads):
    results.append(list(future.result()))
Run Code Online (Sandbox Code Playgroud)

results 将会:

[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]
Run Code Online (Sandbox Code Playgroud)


dka*_*tan 6

只是分享一个不同的解决方案:

import numpy as np
from time import sleep


query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'


def dummy_callback(future):
    global jobs_done
    jobs_done[future.job_id] = True


jobs = [bq.query(query1), bq.query(query2)]
jobs_done = {job.job_id: False for job in jobs}
[job.add_done_callback(dummy_callback) for job in jobs]

# blocking loop to wait for jobs to finish
while not (np.all(list(jobs_done.values()))):
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')
Run Code Online (Sandbox Code Playgroud)

我更as_completed喜欢使用 bigquery 作业本身的内置异步功能,而不是使用。这也使我可以将数据管道分解为单独的云功能,而不必在ThreadPoolExecutor整个管道的持续时间内保持主要功能。顺便说一句,这就是我研究这个问题的原因:我的管道比 Cloud Functions 的最大超时 9 分钟(甚至 Cloud Run 的 15 分钟)更长。

缺点是我需要跟踪job_id各个函数中的所有 s,但是在通过指定输入和输出来配置管道时相对容易解决,以便它们形成有向无环图。