标签: google-cloud-dataflow

在GCS上读取Avro文件时出现OutOfMemoryError异常

我将大小约为650GB的BigQuery数据集导出到GCS上的Avro文件,并运行数据流程序来处理这些Avro文件.但是,即使只处理了一个大小约为1.31GB的Avro文件,也会遇到OutOfMemoryError异常.

我收到以下错误消息,似乎异常来自AvroIO和Avro库:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:260)
        at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:341)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
        at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
        at org.apache.avro.file.DataFileStream.next(DataFileStream.java:220)
        at com.google.cloud.dataflow.sdk.runners.worker.AvroReader$AvroFileIterator.next(AvroReader.java:143)
        at com.google.cloud.dataflow.sdk.runners.worker.AvroReader$AvroFileIterator.next(AvroReader.java:113)
        at com.google.cloud.dataflow.sdk.util.ReaderUtils.readElemsFromReader(ReaderUtils.java:37)
        at com.google.cloud.dataflow.sdk.io.AvroIO.evaluateReadHelper(AvroIO.java:638)
        at com.google.cloud.dataflow.sdk.io.AvroIO.access$000(AvroIO.java:118)
        at com.google.cloud.dataflow.sdk.io.AvroIO$Read$Bound$1.evaluate(AvroIO.java:294)
        at com.google.cloud.dataflow.sdk.io.AvroIO$Read$Bound$1.evaluate(AvroIO.java:290)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:611)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196)
        at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:109)
        at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:204)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:584)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:328)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:70)
        at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:145)
        at com.htc.studio.bdi.dataflow.ActTranGenerator.main(ActTranGenerator.java:224)
Run Code Online (Sandbox Code Playgroud)

对此例外的任何建议?

谢谢!

google-cloud-dataflow

0
推荐指数
1
解决办法
1144
查看次数

如何获取当前滑动窗口的最大时间戳

我正在使用 X 大小和 Y 周期的滑动时间窗口。为了标记每个窗口的输出,我想获取PCollection当前窗口的时间戳。

    PCollection<T> windowedInput = input
      .apply(Window<T>into(
          SlidingWindows.of(Duration.standardMinutes(10))
                        .every(Duration.standardMinutes(1))));

   // Extract key from each input and run a function per group.
   //
   // Q: ExtractKey() depends on the window triggered time.
   //    How can I pass the timestamp of windowedInputs to ExtractKey()?
   PCollection<KV<K, Iterable<T>>> groupedInputs = windowedInputs
     .apply(ParDo.of(new ExtractKey()))
     .apply(GroupByKey.<K, Ts>create());

   // Run Story clustering and write outputs.
   //
   // Q: Also I'd like to add a window timestamp suffix to the output.
   //    How can I …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow

0
推荐指数
1
解决办法
742
查看次数

在Google Cloud Dataflow中按顺序读取文件

我正在使用Spotify Scio来读取从Stackdriver导出到Google云端存储的日志.它们是JSON文件,其中每一行都是单个条目.查看工作日志,似乎文件被拆分为块,然后以任何顺序读取.在这种情况下,我已经将我的工作限制在1名工人身上.有没有办法强制按顺序读取和处理这些块?

作为一个例子(textFile基本上是一个TextIO.Read):

val sc = ScioContext(myOptions)
sc.textFile(myFile).map(line => logger.info(line))
Run Code Online (Sandbox Code Playgroud)

将根据工作日志生成类似于此的输出:

line 5
line 6
line 7
line 8
<Some other work>
line 1
line 2
line 3
line 4
<Some other work>
line 9
line 10
line 11
line 12
Run Code Online (Sandbox Code Playgroud)

我想知道的是,是否有办法迫使它按顺序读取1-12行.我发现gzipping文件并使用指定的CompressionType读取它是一种解决方法,但我想知道是否有任何方法可以执行此操作,不涉及压缩或更改原始文件.

google-cloud-platform google-cloud-dataflow spotify-scio

0
推荐指数
1
解决办法
597
查看次数

BigQuery 将存储数据的位置

我正在使用 BigQueryIO 从 Google Dataflow 作业将数据发布到 BigQuery。

AFAIK,BigQuery 可用于查询来自 Google Cloud Storage、Google Drive 和 Google Sheets 的数据。

但是当我们使用 BigQueryIO 存储数据时,数据将存储在哪里?它在 Google Cloud Storage 中吗?

google-cloud-storage google-bigquery google-cloud-dataflow

0
推荐指数
2
解决办法
3038
查看次数

DataFlow 不确认 PubSub 消息

简单的 gcloud 数据流管道:

PubsubIO.readStrings().fromSubscription -> Window -> ParDo -> DatastoreIO.v1().write()

当负载应用于 pubsub 主题时,消息会被读取但不会被确认

2017 年 7 月 25 日下午 4:20:38 org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubReader stats INFO:Pubsub projects/my-project/subscriptions/my-subscription 已收到 1000 条消息,950 条当前未读消息,843346 个当前未读字节,970 个当前飞行中消息,28367ms 最旧飞行中,1 个当前飞行中检查点,2 个最大飞行中检查点,770B/s 最近读取,1000 个最近接收,0 个最近扩展,0 个最近延迟扩展, 50 最近确认, 990 最近 NACK , 0 最近过期, 898 毫秒最近消息时间戳偏斜, 9224873061464212 毫秒最近水印偏斜, 0 最近延迟消息, 2017-07-25T23:16:49.437Z 最后报告水印

哪个管道步骤应该确认消息?

  • stackdriver 仪表板显示有一些确认,但未确认的消息数量保持稳定。
  • 跟踪中没有错误消息表明消息处理失败。
  • 条目显示在数据存储中

google-cloud-datastore google-cloud-pubsub google-cloud-dataflow

0
推荐指数
1
解决办法
1891
查看次数

我可以修改Apache Beam转换中的元素吗?

Apache的梁编程指南包含以下规则:

3.2.2。不变性

PCollection是不可变的。创建后,您将无法添加,删除或更改单个元素。Beam变换可以处理PCollection的每个元素并生成新的管道数据(作为新的PCollection),但是它不会消耗或修改原始的输入集合。

这是否意味着我不能,一定不能或不应该在自定义转换中修改单个元素?具体来说,我正在使用python SDK并考虑将dict {key: "data"}作为输入,进行一些处理并添加更多字段的转换情况{other_key: "some more data"}。我对上述规则3.2.2的解释是,我应该这样

def process(self,element):
    import copy
    output = copy.deepcopy(element)
    output[other_key] = some_data
    yield output
Run Code Online (Sandbox Code Playgroud)

但我想知道这是否有点过大。

使用TestPipeline,我发现如果我在process()方法中对其进行操作,则输入集合的元素也会被修改(除非这些元素是基本类型,例如int,float,bool ...)。

变异元素被认为是绝对不行的,还是仅需谨慎的一种做法?

google-cloud-dataflow apache-beam

0
推荐指数
1
解决办法
580
查看次数

使用Apache Beam对Dataflow批量加载的性能问题

我正在对数据流批处理负载进行性能基准测试,发现与Bigquery命令行工具上的相同负载相比,负载太慢了.

文件大小约为20 MB,有数百万条记录.我尝试了不同的机器类型,并且n1-highmem-4在加载目标BQ表时加载时间为8分钟时获得了最佳的负载性能.

通过在命令行实用程序上运行BQ命令来应用相同的表加载时,处理和加载相同数量的数据几乎不需要2分钟.有关使用Dataflow作业的负载性能不佳的任何见解?如何提高性能使其与BQ命令行实用程序相媲美?

google-bigquery google-cloud-dataflow apache-beam

0
推荐指数
1
解决办法
394
查看次数

ProtoOperationTransformers的'Java.lang.NoClassDefFoundError`在运行时发生

我正在使用Apache Beam for Java,并且正在使用Cloud DLP API和Cloud Dataflow.作业开始,但在运行时出错.

我认为将DataLink上运行的gRPC库版本与DLP API的客户端库相结合是一个问题,但我不知道要指定哪个版本.

依赖

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-dlp</artifactId>
  <version>0.33.0-beta</version>
</dependency>

<dependency>
  <groupId>com.google.api</groupId>
  <artifactId>gax</artifactId>
  <version>1.16.0</version>
</dependency>
<dependency>
  <groupId>com.google.api</groupId>
  <artifactId>gax-grpc</artifactId>
  <version>0.20.0</version>
</dependency>
<dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>3.2.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

错误

java.lang.RuntimeException: java.lang.NoClassDefFoundError: com/google/api/gax/grpc/ProtoOperationTransformers$ResponseTransformer
    at org.sinmetal.mlapi.DataLossPreventionFn.processElement(DataLossPreventionFn.java:52)
Caused by: java.lang.NoClassDefFoundError: com/google/api/gax/grpc/ProtoOperationTransformers$ResponseTransformer
    at com.google.cloud.dlp.v2beta1.DlpServiceSettings$Builder.initDefaults(DlpServiceSettings.java:425)
    at com.google.cloud.dlp.v2beta1.DlpServiceSettings$Builder.<init>(DlpServiceSettings.java:363)
    at com.google.cloud.dlp.v2beta1.DlpServiceSettings$Builder.createDefault(DlpServiceSettings.java:367)
    at com.google.cloud.dlp.v2beta1.DlpServiceSettings$Builder.access$000(DlpServiceSettings.java:264)
    at com.google.cloud.dlp.v2beta1.DlpServiceSettings.newBuilder(DlpServiceSettings.java:233)
    at com.google.cloud.dlp.v2beta1.DlpServiceClient.create(DlpServiceClient.java:149)
    at org.sinmetal.mlapi.DataLossPreventionFn.processElement(DataLossPreventionFn.java:26)
    at org.sinmetal.mlapi.DataLossPreventionFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
    at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
    at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:324)
    at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
    at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)
    at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
    at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
    at org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn.processElement(PassThroughThenCleanup.java:83)
    at …
Run Code Online (Sandbox Code Playgroud)

java noclassdeffounderror google-cloud-dataflow apache-beam

0
推荐指数
1
解决办法
210
查看次数

BigQueryIO Read vs fromQuery

在Dataflow/Apache Beam程序中说,我正在尝试读取具有指数级增长数据的表.我想提高读取的性能.

BigQueryIO.Read.from("projectid:dataset.tablename")
Run Code Online (Sandbox Code Playgroud)

要么

BigQueryIO.Read.fromQuery("SELECT A, B FROM [projectid:dataset.tablename]")
Run Code Online (Sandbox Code Playgroud)

如果我只选择表中所需的列,而不是上面的整个表,那么我的读取性能会提高吗?

我知道选择几列会降低成本.但是想知道上面的读取性能.

dataflow google-bigquery google-cloud-dataflow

0
推荐指数
1
解决办法
326
查看次数

Apache Beam-我应该了解编写高效数据处理管道的关键概念是什么?

我已经使用Beam一段时间了,我想知道编写高效且优化的Beam管道的关键概念是什么。

我有一些Spark背景知识,并且我知道我们可能更喜欢使用reduceByKey而不是groupByKey以避免混洗并优化网络流量。

Beam也一样吗?

我将不胜感激一些技巧或材料/最佳实践。

google-cloud-dataflow apache-beam

0
推荐指数
1
解决办法
112
查看次数