Ale*_*lex 3 google-bigquery google-cloud-dataflow apache-beam
我正在运行一个简单的数据流作业来从表中读取数据并将数据写回到另一个表中。作业失败并出现以下错误:
工作流程失败。原因:S01:ReadFromBQ+WriteToBigQuery/WriteToBigQuery/NativeWrite 失败。、BigQuery 在项目“[我的项目]”中创建数据集“_dataflow_temp_dataset_18172136482196219053”失败。、BigQuery 执行失败。、错误:消息:访问被拒绝:项目 [我的项目]:用户在项目 [我的项目] 中没有 bigquery.datasets.create 权限。
不过,我并没有尝试创建任何数据集,它基本上是尝试创建 temp_dataset,因为作业失败了。但我没有得到任何关于幕后真正错误的信息。阅读不是问题,真正失败的是写作步骤。我不认为这与权限有关,但我的问题更多的是如何获得真正的错误,而不是这个错误。知道如何解决这个问题吗?
这是代码:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, WorkerOptions
from sys import argv
options = PipelineOptions(flags=argv)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "prj"
google_cloud_options.job_name = 'test'
google_cloud_options.service_account_email = "mysa"
google_cloud_options.staging_location = 'gs://'
google_cloud_options.temp_location = 'gs://'
options.view_as(StandardOptions).runner = 'DataflowRunner'
worker_options = options.view_as(WorkerOptions)
worker_options.subnetwork = 'subnet'
with beam.Pipeline(options=options) as p:
query = "SELECT ..."
bq_source = beam.io.BigQuerySource(query=query, use_standard_sql=True)
bq_data = p | "ReadFromBQ" >> beam.io.Read(bq_source)
table_schema = ...
bq_data | beam.io.WriteToBigQuery(
project="prj",
dataset="test",
table="test",
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
Run Code Online (Sandbox Code Playgroud)
使用 BigQuerySource 时,SDK 会创建临时数据集并将查询的输出存储到临时表中。然后,它从该临时表发出导出以从中读取结果。
因此,创建此 temp_dataset 是预期的行为。这意味着它可能没有隐藏错误。
这没有很好的记录,但可以通过以下读取调用在 BigQuerySource 的实现中看到:BigQuerySource.reader() --> BigQueryReader() --> BigQueryReader().__iter__() --> BigQueryWrapper.run_query( ) --> BigQueryWrapper._start_query_job()。
| 归档时间: |
|
| 查看次数: |
2472 次 |
| 最近记录: |