我刚刚遇到了BigQuery的一个有趣的问题.
基本上有一个批处理作业在BigQuery中重新创建一个表 - 删除数据 - 然后立即开始通过流接口提供新的集合.
曾经这样工作很长一段时间 - 成功.
最近它开始松散数据.
一个小的测试用例已经确认了这种情况 - 如果数据源在重新创建(成功!)表后立即启动,则数据集的某些部分将丢失.即在被输入的4000条记录中,只有2100 - 3500可以通过.
我怀疑在表操作(删除和创建)在整个环境中成功传播之前,表创建可能会返回成功,因此数据集的第一部分将被提供给表的旧副本(在此推测).
为了确认这一点,我在表创建和启动数据馈送之间设置了超时.实际上,如果超时小于120秒 - 部分数据集将丢失.
如果超过120秒 - 似乎工作正常.
以前没有要求超时.我们正在使用美国BigQuery.我错过了一些明显的东西吗?
编辑:从下面的肖恩陈提供的评论和一些其他来源 - 由于表的缓存方式和内部表id传播到整个系统的方式,行为是预期的.BigQuery是为仅附加类型的操作而构建的.重写不是人们可以容易地适应设计的东西,应该避免.
我们使用BigQuery Python API来运行一些分析.为此,我们创建了以下适配器:
def stream_data(self, table, data, schema, how=None):
r = self.connector.tables().list(projectId=self._project_id,
datasetId='lbanor').execute()
table_exists = [row['tableReference']['tableId'] for row in
r['tables'] if
row['tableReference']['tableId'] == table]
if table_exists:
if how == 'WRITE_TRUNCATE':
self.connector.tables().delete(projectId=self._project_id,
datasetId='lbanor',
tableId=table).execute()
body = {
'tableReference': {
'tableId': table,
'projectId': self._project_id,
'datasetId': 'lbanor'
},
'schema': schema
}
self.connector.tables().insert(projectId=(
self._project_id),
datasetId='lbanor',
body=body).execute()
else:
body = {
'tableReference': {
'tableId': table,
'projectId': self._project_id,
'datasetId': 'lbanor'
},
'schema': schema
}
self.connector.tables().insert(projectId=(
self._project_id),
datasetId='lbanor',
body=body).execute()
body = {
'rows': [
{
'json': data, …Run Code Online (Sandbox Code Playgroud)