相关疑难解决方法(0)

重新创建BigQuery表后,流插入不起作用?

我刚刚遇到了BigQuery的一个有趣的问题.

基本上有一个批处理作业在BigQuery中重新创建一个表 - 删除数据 - 然后立即开始通过流接口提供新的集合.

曾经这样工作很长一段时间 - 成功.

最近它开始松散数据.

一个小的测试用例已经确认了这种情况 - 如果数据源在重新创建(成功!)表后立即启动,则数据集的某些部分将丢失.即在被输入的4000条记录中,只有2100 - 3500可以通过.

我怀疑在表操作(删除和创建)在整个环境中成功传播之前,表创建可能会返回成功,因此数据集的第一部分将被提供给表的旧副本(在此推测).

为了确认这一点,我在表创建和启动数据馈送之间设置了超时.实际上,如果超时小于120秒 - 部分数据集将丢失.

如果超过120秒 - 似乎工作正常.

以前没有要求超时.我们正在使用美国BigQuery.我错过了一些明显的东西吗?

编辑:从下面的肖恩陈提供的评论和一些其他来源 - 由于表的缓存方式和内部表id传播到整个系统的方式,行为是预期的.BigQuery是为仅附加类型的操作而构建的.重写不是人们可以容易地适应设计的东西,应该避免.

google-bigquery

5
推荐指数
1
解决办法
1289
查看次数

流式传输之前的BigQuery表截断不起作用

我们使用BigQuery Python API来运行一些分析.为此,我们创建了以下适配器:

def stream_data(self, table, data, schema, how=None):
    r = self.connector.tables().list(projectId=self._project_id,
                                     datasetId='lbanor').execute()
    table_exists = [row['tableReference']['tableId'] for row in
                    r['tables'] if
                    row['tableReference']['tableId'] == table]
    if table_exists:
        if how == 'WRITE_TRUNCATE':
            self.connector.tables().delete(projectId=self._project_id,
                                           datasetId='lbanor',
                                           tableId=table).execute()
            body = {
                'tableReference': {
                    'tableId': table,
                    'projectId': self._project_id,
                    'datasetId': 'lbanor'
                },
                'schema': schema
            }
            self.connector.tables().insert(projectId=(
                                           self._project_id),
                                           datasetId='lbanor',
                                           body=body).execute()
    else:
        body = {
            'tableReference': {
                'tableId': table,
                'projectId': self._project_id,
                'datasetId': 'lbanor'
            },
            'schema': schema
        }
        self.connector.tables().insert(projectId=(
                                       self._project_id),
                                       datasetId='lbanor',
                                       body=body).execute()

    body = {
        'rows': [
            {
                'json': data, …
Run Code Online (Sandbox Code Playgroud)

python google-bigquery

3
推荐指数
1
解决办法
817
查看次数

标签 统计

google-bigquery ×2

python ×1