相关疑难解决方法(0)

从Apache Beam中的多个文件夹中读取文件,并将输出映射到文件名

继续从多个文件夹中读取文件,然后使用python sdk和dataflow runner将文件名如(filecontents,filename)输出到apache beam中的bigquery.

原本以为我可以为每个文件创建一个pcollection,然后使用文件名映射文件内容.

def read_documents(pipeline):
  """Read the documents at the provided uris and returns (uri, line) pairs."""
  pcolls = []
  count = 0
  with open(TESTIN) as uris:
       for uri in uris:
    #print str(uri).strip("[]/'")
         pcolls.append(
         pipeline
         | 'Read: uri' + str(uri)  >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
         | 'WithKey: uri'  + str(uri)   >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri) 
         )
       return pcolls | 'FlattenReadPColls' >> beam.Flatten()
Run Code Online (Sandbox Code Playgroud)

这工作正常,但速度很慢,大约10000个文件后无法在数据流云上工作.如果超过10000个文件,它将遭受破损的管道.

目前正试图从Text.io重载ReadAllFromText函数.Text.io旨在从文件名或模式的pcollection中快速读取大量文件.如果从Google云端存储中读取并且该文件具有内容编码,则此模块中存在错误.谷歌云存储自动枪杀文件并对其进行转码,但由于某些原因,ReadAllFromText无法使用它.您必须更改文件的元数据以删除内容编码,并将ReadAllFromText上的压缩类型设置为gzip.我将此问题包含在内,以防其他人遇到ReadAllFromText问题 https://issues.apache.org/jira/browse/BEAM-1874

我目前的代码看起来像这样

class ReadFromGs(ReadAllFromText):

    def __init__(self):
        super(ReadFromGs, self).__init__(compression_type="gzip")

    def expand(self, pvalue):
        files = …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-platform google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
1217
查看次数

如何在google-cloud-dataflow中使用文件模式匹配时获取文件名

在google-cloud-dataflow中使用文件模式匹配时,有人知道如何获取文件名吗?

我是新手使用数据流.以这种方式使用文件模式匹配时如何获取文件名.

p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*.txt"))
Run Code Online (Sandbox Code Playgroud)

我想如何检测kinglear.txt,Hamlet.txt等文件名.

google-cloud-dataflow

5
推荐指数
1
解决办法
2402
查看次数

Apache Beam TextIO glob 获取原始文件名

我已经设置了管道。我必须解析数百个 *.gz 文件。因此 glob 效果很好。

但我需要当前处理的文件的原始名称,因为我想将结果文件命名为原始文件。

有人能帮我一下吗?

这是我的代码。

@Default.String(LOGS_PATH + "*.gz")
String getInputFile();
void setInputFile(String value);


    TextIO.Read read = TextIO.read().withCompressionType(TextIO.CompressionType.GZIP).from(options.getInputFile());
        read.getName();

        p.apply("ReadLines", read).apply(new CountWords())
         .apply(MapElements.via(new FormatAsTextFn()))
         .apply("WriteCounts", TextIO.write().to(WordCountOptions.LOGS_PATH + "_" + options.getOutput()));

    p.run().waitUntilFinish();
Run Code Online (Sandbox Code Playgroud)

java clob google-cloud-dataflow apache-beam

3
推荐指数
1
解决办法
2213
查看次数

使用 Google Cloud DataFlow python sdk 读取一组 xml 文件

我正在尝试从 GCS 存储桶读取 XML 文件的集合并处理它们,其中集合中的每个元素都是代表整个文件的字符串,但我找不到关于如何完成此操作的合适示例,我也无法理解它来自 Apache Beam 文档,主要是关于 Java 版本的。

我当前的管道如下所示:

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(p
 | 'Read from a File' >> beam.io.Read(training_files_folder)
 | 'String To BigQuery Row' >> beam.Map(lambda s:
                                        data_ingestion.parse_method(s))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

我收到的错误消息是:

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1664, in <module>
main()

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1068, in …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-dataflow

3
推荐指数
1
解决办法
2993
查看次数

如何从Dataflow中的PCollection读取bigQuery

我有一个从pubsub获得的Object的PCollection,可以这样说:

 PCollection<Student> pStudent ;
Run Code Online (Sandbox Code Playgroud)

在学生属性中,有一个属性,比如说studentID;并且我想使用此学生ID从BigQuery读取属性(class_code),并将我从BQ获取的class_code设置为PCollcetion中的Student Object

有谁知道如何实现这一目标?我知道在Beam中有一个,BigQueryIO但是如果我要在BQ中执行的查询字符串条件来自PCollection中的学生对象(studentID),那么我该怎么办?如何从BigQuery的结果中将值设置为PCollection ?

google-bigquery google-cloud-dataflow apache-beam

2
推荐指数
1
解决办法
1256
查看次数