在堆栈溢出之前我已经看过这个问题的答案(/sf/ask/2098853501/),但不是因为apache beam为python添加了可拆分的dofn功能.在将文件模式传递给gcs存储桶时,如何访问当前正在处理的文件的文件名?
我想将文件名传递给我的转换函数:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText('gs://url to file')
data = (
lines
| 'Jsonify' >> beam.Map(jsonify)
| 'Unnest' >> beam.FlatMap(unnest)
| 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
'project_id:dataset_id.table_name', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
Run Code Online (Sandbox Code Playgroud)
最后,我想要做的是在转换json的每一行时将文件名传递给我的转换函数(请参阅此内容,然后使用文件名在不同的BQ表中进行查找以获取值).我想一旦我设法知道如何获取文件名,我将能够找出侧输入部分,以便在bq表中进行查找并获得唯一值.
python google-bigquery google-cloud-platform google-cloud-dataflow apache-beam
我需要从GCS存储桶中读取文件.我知道我将不得不使用GCS API /客户端库,但我找不到任何与之相关的示例.
我一直在参考GCS文档中的这个链接: GCS客户端库.但真的不能成功.如果有人能提供一个真正有用的例子.谢谢.