小编jos*_*azo的帖子

BigQuery Storage gRPC-Web:创建读取会话时出现 404 错误

新的更新在最后。

我们正在用 JavaScript 为浏览器创建 BigQuery 存储库。

我们面临的问题是我们从https://bigquerystorage.googleapis.com/以下位置获得 404 错误:

404伪装成CORS

看起来像是 CORS 问题,但实际上是 404 问题:

$> curl 'https://bigquerystorage.googleapis.com/google.cloud.bigquery.storage.v1beta1.BigQueryStorage/CreateReadSession' \
       -H 'x-goog-request-params: table_reference.project_id=project&table_reference.dataset_id=dataset' \
       -H 'X-User-Agent: grpc-web-javascript/0.1' -H 'DNT: 1' -H 'Content-Type: application/grpc-web-text' \
       -H 'Accept: application/grpc-web-text' -H 'X-Grpc-Web: 1' -H 'Sec-Fetch-Dest: empty' \
       --data-binary 'ydG9k_binary_data_oATgB' --compressed

<!DOCTYPE html>
<html lang=en>
  <meta charset=utf-8>
  <meta name=viewport content="initial-scale=1, minimum-scale=1, width=device-width">
  <title>Error 404 (Not Found)!!1</title>
...
Run Code Online (Sandbox Code Playgroud)

知道请求发生了什么吗?

这是我们正在使用的所有资源。这是基本结构:

项目结构

原始.proto文件来自:

为了编译 …

javascript google-bigquery grpc

6
推荐指数
0
解决办法
253
查看次数

如何根据上一个任务的结果在SubDAG中真正创建n个任务

我正在使用SubDAG在Airflow中创建动态DAG。我需要的是SubDAG内部的任务数由上一个任务的结果确定(subtask_idsmiddle_section函数的变量应该与该函数的变量相同initial_task)。

问题是我无法访问xcoma的subdag函数,SubDagOperator因为我没有任何上下文。另外,由于调度程序的自动发现DAG功能,我无法到达任何数据库以读取某些值:middle_section每隔几秒钟执行一次。

你们如何解决这个问题?根据先前任务的结果在SubDAG中创建动态数量的任务?

这是我正在开发的代码:

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}


def initial_task(**context):
    subtask_ids = [0, 1, 2]
    task_instance = context['ti']
    task_instance.xcom_push(key='depot_ids', value=subtask_ids)


def middle_section_task(subtask_id):
    print(subtask_id)


def middle_section(parent_dag, arg):
    subdag = DAG(dag_id=f'{dag.dag_id}.middle',
                 default_args=args, schedule_interval='@once')

    subtask_ids = ''  # Read from xcom

    for subtask_id in subtask_ids:
        PythonOperator(task_id=f'{dag.dag_id}.middle_section_task_{subtask_id}',
                       python_callable=middle_section_task,
                       op_kwargs={'subtask_id': subtask_id}, dag=subdag)

    return subdag


def …
Run Code Online (Sandbox Code Playgroud)

python airflow

5
推荐指数
1
解决办法
1033
查看次数

BigTable:使用Python客户端提高BigTable查询的性能

查询BigTable时,我们遇到了一些性能问题。

查询约1400行时,监控仪表板中将获得约500行/秒的速度,而在与BigTable实例位于同一区域的计算机中运行下一个代码片段时,则需要约1.6秒:

partial_rows = instance.table(table_name).read_rows(filter_=row_filter, row_set=row_set)

ids = {}
for row in partial_rows:
    key = row.row_key.decode()
    category = key[0:5]
    id_, year_month = key[5:].split('_')
    dates = ids.get(id_, {})

    for k, v in row.cells[column_family].items():
        sub_key = k.decode()
        day = sub_key[0:2]
        measurement = f'{category}_{sub_key[3:]}'

        date = datetime.date(int(year_month[0:4]), int(year_month[4:6]), int(day))
        value = struct.unpack('>i', v[0].value)[0]

        measurements = dates.get(date, {})
        measurements[measurement] = value
        dates[date] = measurements

    ids[id_] = dates
Run Code Online (Sandbox Code Playgroud)

我们使用的表模式是:

  • 行键: {category}{id}_{year}{month}
  • 柱: {day}{measurement_name}

在我们的案例中,此架构完全遵循BigTable的准则。

该代码段非常简单,我们使用键和列名执行一些操作以创建如下所示的ids字典:

{
  "4326": {
    "2019-01-01": {
      "value_a": 49
    }, …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-platform google-cloud-bigtable

5
推荐指数
0
解决办法
83
查看次数