亚马逊EMR上的Spark:"超时等待来自池的连接"

cla*_*lay 15 amazon-emr apache-spark

我正在小型三台服务器Amazon EMR 5(Spark 2.0)集群上运行Spark工作.我的工作运行了一个小时左右,因以下错误而失败.我可以手动重启并运行,处理更多数据,最终再次失败.

我的Spark代码非常简单,不直接使用任何Amazon或S3 API.我的Spark代码将S3文本字符串路径传递给Spark,Spark在内部使用S3.

我的Spark程序在循环中执行以下操作:从S3加载数据 - >处理 - >将数据写入S3上的不同位置.

我的第一个怀疑是,某些内部Amazon或Spark代码未正确处理连接,并且连接池已耗尽.

com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:991)
            at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:212)
            at sun.reflect.GeneratedMethodAccessor45.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
            at com.sun.proxy.$Proxy44.retrieveMetadata(Unknown Source)
            at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:780)
            at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1428)
            at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:313)
            at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:85)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
            at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
            at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
            at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:487)
            at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
            at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
            at sun.reflect.GeneratedMethodAccessor85.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
            at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
            at py4j.Gateway.invoke(Gateway.java:280)
            at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
            at py4j.commands.CallCommand.execute(CallCommand.java:79)
            at py4j.GatewayConnection.run(GatewayConnection.java:211)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
            at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy45.getConnection(Unknown Source)
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
            ... 41 more
Run Code Online (Sandbox Code Playgroud)

hid*_*bit 10

我在EMR上有一个非常简单的程序遇到了这个问题(从S3读取数据,过滤,写入S3).

我可以通过使用S3A文件系统的实现和设置解决这个问题fs.s3a.connection.maximum,以100有一个更大的连接池.(默认为15;请参阅Hadoop-AWS模块:与Amazon Web Services集成以获取更多配置属性)

这是我设置配置的方式:

// in Scala
val hc = sc.hadoopConfiguration

// in Python (not tested)
hc = sc._jsc.hadoopConfiguration()

// setting the config is the same for both languages
hc.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hc.setInt("fs.s3a.connection.maximum", 100)
Run Code Online (Sandbox Code Playgroud)

为了使它工作,传递给Spark的S3 URI必须从头开始 s3a://...

  • @Markus:这个页面展示了如何从 Hadoop 连接到 S3 的一般方法 - 有 `s3a` 是要走的路。但是这个问题是关于从 AWS EMR 访问 S3,这里应该使用 `s3`,因为 EMR 提供了专有的 Amazon EMRFS 来访问具有更高性能的 S3。 (2认同)

Evi*_*ter 5

通过在 emrfs-site 配置中将 fs.s3.maxConnections 设置为大于默认值 500 的值,也可以在保留 EMRFS 时解决此问题

https://aws.amazon.com/premiumsupport/knowledge-center/emr-timeout-connection-wait/

如果使用 Java SDK 启动 EMR 集群,您可以使用 方法进行设置withConfigurations(这比通过修改文件手动完成要容易得多)。另请参阅/sf/answers/3681654091/

您可以使用 EMR 中的“配置”选项卡检查此项设置是否正确,例如

在此输入图像描述