新的更新在最后。
我们正在用 JavaScript 为浏览器创建 BigQuery 存储库。
我们面临的问题是我们从https://bigquerystorage.googleapis.com/以下位置获得 404 错误:
看起来像是 CORS 问题,但实际上是 404 问题:
$> curl 'https://bigquerystorage.googleapis.com/google.cloud.bigquery.storage.v1beta1.BigQueryStorage/CreateReadSession' \
-H 'x-goog-request-params: table_reference.project_id=project&table_reference.dataset_id=dataset' \
-H 'X-User-Agent: grpc-web-javascript/0.1' -H 'DNT: 1' -H 'Content-Type: application/grpc-web-text' \
-H 'Accept: application/grpc-web-text' -H 'X-Grpc-Web: 1' -H 'Sec-Fetch-Dest: empty' \
--data-binary 'ydG9k_binary_data_oATgB' --compressed
<!DOCTYPE html>
<html lang=en>
<meta charset=utf-8>
<meta name=viewport content="initial-scale=1, minimum-scale=1, width=device-width">
<title>Error 404 (Not Found)!!1</title>
...
Run Code Online (Sandbox Code Playgroud)
知道请求发生了什么吗?
这是我们正在使用的所有资源。这是基本结构:
原始.proto文件来自:
src/proto/google/api/: https://github.com/googleapis/googleapissrc/proto/google/cloud/bigquery/storage/v1beta1/:https : //github.com/googleapis/python-bigquery-storagesrc/proto/google/protobuf/: https://github.com/protocolbuffers/protobuf为了编译 …
我正在使用SubDAG在Airflow中创建动态DAG。我需要的是SubDAG内部的任务数由上一个任务的结果确定(subtask_ids该middle_section函数的变量应该与该函数的变量相同initial_task)。
问题是我无法访问xcoma的subdag函数,SubDagOperator因为我没有任何上下文。另外,由于调度程序的自动发现DAG功能,我无法到达任何数据库以读取某些值:middle_section每隔几秒钟执行一次。
你们如何解决这个问题?根据先前任务的结果在SubDAG中创建动态数量的任务?
这是我正在开发的代码:
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
def initial_task(**context):
subtask_ids = [0, 1, 2]
task_instance = context['ti']
task_instance.xcom_push(key='depot_ids', value=subtask_ids)
def middle_section_task(subtask_id):
print(subtask_id)
def middle_section(parent_dag, arg):
subdag = DAG(dag_id=f'{dag.dag_id}.middle',
default_args=args, schedule_interval='@once')
subtask_ids = '' # Read from xcom
for subtask_id in subtask_ids:
PythonOperator(task_id=f'{dag.dag_id}.middle_section_task_{subtask_id}',
python_callable=middle_section_task,
op_kwargs={'subtask_id': subtask_id}, dag=subdag)
return subdag
def …Run Code Online (Sandbox Code Playgroud) 查询BigTable时,我们遇到了一些性能问题。
查询约1400行时,监控仪表板中将获得约500行/秒的速度,而在与BigTable实例位于同一区域的计算机中运行下一个代码片段时,则需要约1.6秒:
partial_rows = instance.table(table_name).read_rows(filter_=row_filter, row_set=row_set)
ids = {}
for row in partial_rows:
key = row.row_key.decode()
category = key[0:5]
id_, year_month = key[5:].split('_')
dates = ids.get(id_, {})
for k, v in row.cells[column_family].items():
sub_key = k.decode()
day = sub_key[0:2]
measurement = f'{category}_{sub_key[3:]}'
date = datetime.date(int(year_month[0:4]), int(year_month[4:6]), int(day))
value = struct.unpack('>i', v[0].value)[0]
measurements = dates.get(date, {})
measurements[measurement] = value
dates[date] = measurements
ids[id_] = dates
Run Code Online (Sandbox Code Playgroud)
我们使用的表模式是:
{category}{id}_{year}{month}{day}{measurement_name}在我们的案例中,此架构完全遵循BigTable的准则。
该代码段非常简单,我们使用键和列名执行一些操作以创建如下所示的ids字典:
{
"4326": {
"2019-01-01": {
"value_a": 49
}, …Run Code Online (Sandbox Code Playgroud)