使用 Google Cloud DataFlow python sdk 读取一组 xml 文件

Tom*_*mer 3 python google-cloud-dataflow

我正在尝试从 GCS 存储桶读取 XML 文件的集合并处理它们,其中集合中的每个元素都是代表整个文件的字符串,但我找不到关于如何完成此操作的合适示例,我也无法理解它来自 Apache Beam 文档,主要是关于 Java 版本的。

我当前的管道如下所示:

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(p
 | 'Read from a File' >> beam.io.Read(training_files_folder)
 | 'String To BigQuery Row' >> beam.Map(lambda s:
                                        data_ingestion.parse_method(s))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

我收到的错误消息是:

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1664, in <module>
main()

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals)  # execute the script

File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 135, in <module>
run()

File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 130, in run
p.run().wait_until_finish()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 421, in wait_until_finish
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 398, in await_completion
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 444, in await_completion
six.reraise(t, v, tb)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 341, in call
finish_state)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 366, in attempt_call
side_input_values)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 109, in get_evaluator
input_committed_bundle, side_inputs)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 283, in __init__
self._source.pipeline_options = evaluation_context.pipeline_options
AttributeError: 'str' object has no attribute 'pipeline_options'
Run Code Online (Sandbox Code Playgroud)

非常感谢任何帮助。谢谢托默

解决了第一个问题:事实证明这不适用于 DirectRunner,将运行器更改为 DataFlowRunner 并将Read替换为ReadFromText解决了该异常:

p = 束.Pipeline(选项=PipelineOptions(pipeline_args))

(p
 | 'Read from a File' >> beam.io.ReadFromText(training_files_folder)
 | 'String To BigQuery Row' >> beam.Map(lambda s:
                                        data_ingestion.parse_method(s))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish() 
Run Code Online (Sandbox Code Playgroud)

但现在我看到这种方法为我提供了每个文件中的一行作为管道元素,而我希望将整个文件作为字符串作为每个元素。不知道该怎么做。我找到了这篇文章,但它是用 java 编写的,不确定它如何与 python 和 gcs 版本一起使用。

所以看起来 ReadFromText 不适用于我的用例,而且我不知道如何创建文件管道。

解决方案:感谢 Ankur 的帮助,我修改了代码,包括从 MatchResult 对象列表转换所需的步骤,这是 GCSFileSystem 返回的字符串 pCollection,每个字符串代表一个文件。

p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)

(p
 | 'Read Files' >> beam.Create([m.metadata_list for m in gcs.match([training_files_folder])])
 | 'metadata_list to filepath' >> beam.FlatMap(lambda metadata_list: [metadata.path for metadata in metadata_list])
 | 'string To BigQuery Row' >> beam.Map(lambda filepath:
                                        data_ingestion.parse_method(gcs_reader.get_string_from_filepath(filepath)))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                # Creates the table in BigQuery if it does not yet exist.
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                # Appends data to the BigQuery table
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
p.run().wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

该代码使用此帮助程序类来读取 gcs 文件:

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

  def get_string_from_filepath(self,filepath):
      with self.gcs.open(filepath) as reader:
          res = reader.read()

      return res
Run Code Online (Sandbox Code Playgroud)