继续从多个文件夹中读取文件,然后使用python sdk和dataflow runner将文件名如(filecontents,filename)输出到apache beam中的bigquery.
原本以为我可以为每个文件创建一个pcollection,然后使用文件名映射文件内容.
def read_documents(pipeline):
"""Read the documents at the provided uris and returns (uri, line) pairs."""
pcolls = []
count = 0
with open(TESTIN) as uris:
for uri in uris:
#print str(uri).strip("[]/'")
pcolls.append(
pipeline
| 'Read: uri' + str(uri) >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
| 'WithKey: uri' + str(uri) >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri)
)
return pcolls | 'FlattenReadPColls' >> beam.Flatten()
Run Code Online (Sandbox Code Playgroud)
这工作正常,但速度很慢,大约10000个文件后无法在数据流云上工作.如果超过10000个文件,它将遭受破损的管道.
目前正试图从Text.io重载ReadAllFromText函数.Text.io旨在从文件名或模式的pcollection中快速读取大量文件.如果从Google云端存储中读取并且该文件具有内容编码,则此模块中存在错误.谷歌云存储自动枪杀文件并对其进行转码,但由于某些原因,ReadAllFromText无法使用它.您必须更改文件的元数据以删除内容编码,并将ReadAllFromText上的压缩类型设置为gzip.我将此问题包含在内,以防其他人遇到ReadAllFromText问题 https://issues.apache.org/jira/browse/BEAM-1874
我目前的代码看起来像这样
class ReadFromGs(ReadAllFromText):
def __init__(self):
super(ReadFromGs, self).__init__(compression_type="gzip")
def expand(self, pvalue):
files = …
Run Code Online (Sandbox Code Playgroud) python google-cloud-platform google-cloud-dataflow apache-beam
在google-cloud-dataflow中使用文件模式匹配时,有人知道如何获取文件名吗?
我是新手使用数据流.以这种方式使用文件模式匹配时如何获取文件名.
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*.txt"))
Run Code Online (Sandbox Code Playgroud)
我想如何检测kinglear.txt,Hamlet.txt等文件名.
我已经设置了管道。我必须解析数百个 *.gz 文件。因此 glob 效果很好。
但我需要当前处理的文件的原始名称,因为我想将结果文件命名为原始文件。
有人能帮我一下吗?
这是我的代码。
@Default.String(LOGS_PATH + "*.gz")
String getInputFile();
void setInputFile(String value);
TextIO.Read read = TextIO.read().withCompressionType(TextIO.CompressionType.GZIP).from(options.getInputFile());
read.getName();
p.apply("ReadLines", read).apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(WordCountOptions.LOGS_PATH + "_" + options.getOutput()));
p.run().waitUntilFinish();
Run Code Online (Sandbox Code Playgroud) 我正在尝试从 GCS 存储桶读取 XML 文件的集合并处理它们,其中集合中的每个元素都是代表整个文件的字符串,但我找不到关于如何完成此操作的合适示例,我也无法理解它来自 Apache Beam 文档,主要是关于 Java 版本的。
我当前的管道如下所示:
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(p
| 'Read from a File' >> beam.io.Read(training_files_folder)
| 'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='title:STRING,text:STRING,id:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
Run Code Online (Sandbox Code Playgroud)
我收到的错误消息是:
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1664, in <module>
main()
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1068, in …
Run Code Online (Sandbox Code Playgroud) 我有一个从pubsub获得的Object的PCollection,可以这样说:
PCollection<Student> pStudent ;
Run Code Online (Sandbox Code Playgroud)
在学生属性中,有一个属性,比如说studentID;并且我想使用此学生ID从BigQuery读取属性(class_code),并将我从BQ获取的class_code设置为PCollcetion中的Student Object
有谁知道如何实现这一目标?我知道在Beam中有一个,BigQueryIO
但是如果我要在BQ中执行的查询字符串条件来自PCollection中的学生对象(studentID),那么我该怎么办?如何从BigQuery的结果中将值设置为PCollection ?