我将大小约为650GB的BigQuery数据集导出到GCS上的Avro文件,并运行数据流程序来处理这些Avro文件.但是,即使只处理了一个大小约为1.31GB的Avro文件,也会遇到OutOfMemoryError异常.
我收到以下错误消息,似乎异常来自AvroIO和Avro库:
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:260)
at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:341)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:220)
at com.google.cloud.dataflow.sdk.runners.worker.AvroReader$AvroFileIterator.next(AvroReader.java:143)
at com.google.cloud.dataflow.sdk.runners.worker.AvroReader$AvroFileIterator.next(AvroReader.java:113)
at com.google.cloud.dataflow.sdk.util.ReaderUtils.readElemsFromReader(ReaderUtils.java:37)
at com.google.cloud.dataflow.sdk.io.AvroIO.evaluateReadHelper(AvroIO.java:638)
at com.google.cloud.dataflow.sdk.io.AvroIO.access$000(AvroIO.java:118)
at com.google.cloud.dataflow.sdk.io.AvroIO$Read$Bound$1.evaluate(AvroIO.java:294)
at com.google.cloud.dataflow.sdk.io.AvroIO$Read$Bound$1.evaluate(AvroIO.java:290)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:611)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:109)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:204)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:584)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:328)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:70)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:145)
at com.htc.studio.bdi.dataflow.ActTranGenerator.main(ActTranGenerator.java:224)
Run Code Online (Sandbox Code Playgroud)
对此例外的任何建议?
谢谢!
我正在使用 X 大小和 Y 周期的滑动时间窗口。为了标记每个窗口的输出,我想获取PCollection当前窗口的时间戳。
PCollection<T> windowedInput = input
.apply(Window<T>into(
SlidingWindows.of(Duration.standardMinutes(10))
.every(Duration.standardMinutes(1))));
// Extract key from each input and run a function per group.
//
// Q: ExtractKey() depends on the window triggered time.
// How can I pass the timestamp of windowedInputs to ExtractKey()?
PCollection<KV<K, Iterable<T>>> groupedInputs = windowedInputs
.apply(ParDo.of(new ExtractKey()))
.apply(GroupByKey.<K, Ts>create());
// Run Story clustering and write outputs.
//
// Q: Also I'd like to add a window timestamp suffix to the output.
// How can I …Run Code Online (Sandbox Code Playgroud) 我正在使用Spotify Scio来读取从Stackdriver导出到Google云端存储的日志.它们是JSON文件,其中每一行都是单个条目.查看工作日志,似乎文件被拆分为块,然后以任何顺序读取.在这种情况下,我已经将我的工作限制在1名工人身上.有没有办法强制按顺序读取和处理这些块?
作为一个例子(textFile基本上是一个TextIO.Read):
val sc = ScioContext(myOptions)
sc.textFile(myFile).map(line => logger.info(line))
Run Code Online (Sandbox Code Playgroud)
将根据工作日志生成类似于此的输出:
line 5
line 6
line 7
line 8
<Some other work>
line 1
line 2
line 3
line 4
<Some other work>
line 9
line 10
line 11
line 12
Run Code Online (Sandbox Code Playgroud)
我想知道的是,是否有办法迫使它按顺序读取1-12行.我发现gzipping文件并使用指定的CompressionType读取它是一种解决方法,但我想知道是否有任何方法可以执行此操作,不涉及压缩或更改原始文件.
我正在使用 BigQueryIO 从 Google Dataflow 作业将数据发布到 BigQuery。
AFAIK,BigQuery 可用于查询来自 Google Cloud Storage、Google Drive 和 Google Sheets 的数据。
但是当我们使用 BigQueryIO 存储数据时,数据将存储在哪里?它在 Google Cloud Storage 中吗?
简单的 gcloud 数据流管道:
PubsubIO.readStrings().fromSubscription -> Window -> ParDo -> DatastoreIO.v1().write()
当负载应用于 pubsub 主题时,消息会被读取但不会被确认:
2017 年 7 月 25 日下午 4:20:38 org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubReader stats INFO:Pubsub projects/my-project/subscriptions/my-subscription 已收到 1000 条消息,950 条当前未读消息,843346 个当前未读字节,970 个当前飞行中消息,28367ms 最旧飞行中,1 个当前飞行中检查点,2 个最大飞行中检查点,770B/s 最近读取,1000 个最近接收,0 个最近扩展,0 个最近延迟扩展, 50 最近确认, 990 最近 NACK , 0 最近过期, 898 毫秒最近消息时间戳偏斜, 9224873061464212 毫秒最近水印偏斜, 0 最近延迟消息, 2017-07-25T23:16:49.437Z 最后报告水印
哪个管道步骤应该确认消息?
google-cloud-datastore google-cloud-pubsub google-cloud-dataflow
在Apache的梁编程指南包含以下规则:
3.2.2。不变性
PCollection是不可变的。创建后,您将无法添加,删除或更改单个元素。Beam变换可以处理PCollection的每个元素并生成新的管道数据(作为新的PCollection),但是它不会消耗或修改原始的输入集合。
这是否意味着我不能,一定不能或不应该在自定义转换中修改单个元素?具体来说,我正在使用python SDK并考虑将dict {key: "data"}作为输入,进行一些处理并添加更多字段的转换情况{other_key: "some more data"}。我对上述规则3.2.2的解释是,我应该这样
def process(self,element):
import copy
output = copy.deepcopy(element)
output[other_key] = some_data
yield output
Run Code Online (Sandbox Code Playgroud)
但我想知道这是否有点过大。
使用TestPipeline,我发现如果我在process()方法中对其进行操作,则输入集合的元素也会被修改(除非这些元素是基本类型,例如int,float,bool ...)。
变异元素被认为是绝对不行的,还是仅需谨慎的一种做法?
我正在对数据流批处理负载进行性能基准测试,发现与Bigquery命令行工具上的相同负载相比,负载太慢了.
文件大小约为20 MB,有数百万条记录.我尝试了不同的机器类型,并且n1-highmem-4在加载目标BQ表时加载时间为8分钟时获得了最佳的负载性能.
通过在命令行实用程序上运行BQ命令来应用相同的表加载时,处理和加载相同数量的数据几乎不需要2分钟.有关使用Dataflow作业的负载性能不佳的任何见解?如何提高性能使其与BQ命令行实用程序相媲美?
我正在使用Apache Beam for Java,并且正在使用Cloud DLP API和Cloud Dataflow.作业开始,但在运行时出错.
我认为将DataLink上运行的gRPC库版本与DLP API的客户端库相结合是一个问题,但我不知道要指定哪个版本.
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-dlp</artifactId>
<version>0.33.0-beta</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax-grpc</artifactId>
<version>0.20.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.2.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
java.lang.RuntimeException: java.lang.NoClassDefFoundError: com/google/api/gax/grpc/ProtoOperationTransformers$ResponseTransformer
at org.sinmetal.mlapi.DataLossPreventionFn.processElement(DataLossPreventionFn.java:52)
Caused by: java.lang.NoClassDefFoundError: com/google/api/gax/grpc/ProtoOperationTransformers$ResponseTransformer
at com.google.cloud.dlp.v2beta1.DlpServiceSettings$Builder.initDefaults(DlpServiceSettings.java:425)
at com.google.cloud.dlp.v2beta1.DlpServiceSettings$Builder.<init>(DlpServiceSettings.java:363)
at com.google.cloud.dlp.v2beta1.DlpServiceSettings$Builder.createDefault(DlpServiceSettings.java:367)
at com.google.cloud.dlp.v2beta1.DlpServiceSettings$Builder.access$000(DlpServiceSettings.java:264)
at com.google.cloud.dlp.v2beta1.DlpServiceSettings.newBuilder(DlpServiceSettings.java:233)
at com.google.cloud.dlp.v2beta1.DlpServiceClient.create(DlpServiceClient.java:149)
at org.sinmetal.mlapi.DataLossPreventionFn.processElement(DataLossPreventionFn.java:26)
at org.sinmetal.mlapi.DataLossPreventionFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324)
at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
at org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn.processElement(PassThroughThenCleanup.java:83)
at …Run Code Online (Sandbox Code Playgroud) 在Dataflow/Apache Beam程序中说,我正在尝试读取具有指数级增长数据的表.我想提高读取的性能.
BigQueryIO.Read.from("projectid:dataset.tablename")
Run Code Online (Sandbox Code Playgroud)
要么
BigQueryIO.Read.fromQuery("SELECT A, B FROM [projectid:dataset.tablename]")
Run Code Online (Sandbox Code Playgroud)
如果我只选择表中所需的列,而不是上面的整个表,那么我的读取性能会提高吗?
我知道选择几列会降低成本.但是想知道上面的读取性能.
我已经使用Beam一段时间了,我想知道编写高效且优化的Beam管道的关键概念是什么。
我有一些Spark背景知识,并且我知道我们可能更喜欢使用reduceByKey而不是groupByKey以避免混洗并优化网络流量。
Beam也一样吗?
我将不胜感激一些技巧或材料/最佳实践。