从 Google App Engine 应用运行 Google Dataflow 管道?

Bha*_*aja 3 google-bigquery google-cloud-platform google-cloud-dataflow

我正在使用 DataflowPipelineRunner 创建数据流作业。我尝试了以下场景。

  1. 不指定任何 machineType
  2. 配g1小机
  3. 使用 n1-highmem-2

在上述所有场景中,输入是来自 GCS 的一个非常小的文件(KB 大小),输出是大查询表。

我在所有情况下都出现内存不足错误

我编译的代码的大小是 94mb。我只尝试字数统计示例,它没有读取任何输入(在作业开始之前它失败了)。请帮助我理解为什么我会收到此错误。

注意:我正在使用 appengine 来开始这项工作。

注意:相同的代码适用于 beta 版本0.4.150414

编辑 1

根据答案中的建议尝试了以下操作,

  1. 自动缩放切换到基本缩放
  2. 二手机器类型B2,提供 256MB 内存

经过这些配置,Java Heap Memory 问题就解决了。但它试图将一个 jar 上传到超过 10Mb 的暂存位置,因此它失败了。

它记录以下异常

com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)
Run Code Online (Sandbox Code Playgroud)

我尝试直接上传 jar 文件 - appengine-api-1.0-sdk-1.9.20.jar,它仍然尝试上传这个 jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar。我不知道它是什么罐子。任何关于这个罐子的想法都值得赞赏。

请帮我解决这个问题。

Luk*_*wik 5

简短的回答是,如果您在托管 VM上使用 AppEngine,您将不会遇到 AppEngine 沙箱限制(使用F1 或 B1 实例类时出现 OOM、执行时间限制问题、列入白名单的 JRE 类)。如果您真的想在 App Engine 沙箱中运行,那么您对 ​​Dataflow SDK 的使用最符合 AppEngine 沙箱的限制。下面我将解释常见问题以及人们为符合 AppEngine 沙箱限制所做的工作。

Dataflow SDK 需要一个 AppEngine 实例类,该实例类具有足够的内存来执行用户应用程序以构建管道、暂存任何资源并将作业描述发送到 Dataflow 服务。通常我们已经看到用户需要使用内存超过 128mb的实例类才能看到 OOM 错误。

如果您的应用程序所需的资源已经暂存,那么构建管道并将其提交到 Dataflow 服务通常只需不到几秒钟的时间。将 JAR 和任何其他资源上传到 GCS 可能需要 60 秒以上的时间。这可以通过预先将 JAR 预先暂存到 GCS 来手动解决(如果检测到它们已经存在,Dataflow SDK 将跳过再次暂存它们)或使用任务队列来获得 10 分钟的限制(请注意,对于大型应用程序,10分钟可能不足以暂存您的所有资源)。

最后,在 AppEngine 沙盒环境中,您和您的所有依赖项仅限于使用JRE 中列入白名单的类,否则您将收到如下异常:

java.lang.SecurityException:
  java.lang.IllegalAccessException: YYY is not allowed on ZZZ
  ...
Run Code Online (Sandbox Code Playgroud)

编辑 1

我们对类路径上的 jars 内容执行哈希,并使用修改后的文件名将它们上传到 GCS。AppEngine 使用自己的 JAR 运行沙盒环境,appengine-api-L4wtoWwoElWmstI1Ia93cg.jar指的是appengine-api.jar,它是沙盒环境添加的 jar。您可以从我们的PackageUtil#getUniqueContentName(...) 中看到,我们只是在.jar之前附加-$HASH

我们正在努力解决您看到RequestPayloadToLarge异常的原因,目前建议您设置filesToStage选项并过滤掉执行 Dataflow 不需要的 jar 以解决您面临的问题。您可以看到我们如何使用DataflowPipelineRunner#detectClassPathResourcesToStage(...)构建要暂存的文件。