ApacheSpark 从 S3 读取异常:内容长度分隔消息正文过早结束(预期:2,250,236;收到:16,360)

Rom*_*ler 3 amazon-s3 apache-spark apache-spark-sql ibm-cloud-storage

我想从 S3 资源创建 Apache Spark DataFrame。我在 AWS 和 IBM S3 Clout 对象存储上尝试过,都失败了

org.apache.spark.util.TaskCompletionListenerException: Premature end of Content-Length delimited message body (expected: 2,250,236; received: 16,360)
Run Code Online (Sandbox Code Playgroud)

我正在运行 pyspark

./pyspark --packages com.amazonaws:aws-java-sdk-pom:1.11.828,org.apache.hadoop:hadoop-aws:2.7.0
Run Code Online (Sandbox Code Playgroud)

我正在为 IBM 设置 S3 配置

sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "xx")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "xx")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-de.cloud-object-storage.appdomain.cloud")
Run Code Online (Sandbox Code Playgroud)

或者 AWS 与

sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "xx")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", " xx ")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com")
Run Code Online (Sandbox Code Playgroud)

在这两种情况下,代码如下: df=spark.read.csv("s3a://drill-test/cases.csv")

它失败了,但有例外

org.apache.spark.util.TaskCompletionListenerException: Premature end of Content-Length delimited message body (expected: 2,250,236; received: 16,360)
Run Code Online (Sandbox Code Playgroud)

小智 5

这可能会让你感到非常困惑。

错误如下:

org.apache.spark.util.TaskCompletionListenerException: Premature end of Content-Length delimited message body (expected: 2,250,236; received: 16,360)
Run Code Online (Sandbox Code Playgroud)

s3 是否告诉您与 s3 通信时出现错误。我的猜测是,您使用的是较旧版本的 Spark,它不知道异常是什么,并且它尝试将文件作为 XML 错误消息返回。

请参阅以下更新,将它们放在您的读取调用上方并填写<aws_key><aws_secret>和 ,这对您的情况有所帮助<aws_region>

hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.awsAccessKeyId", "<aws_key>")
hadoop_conf.set("fs.s3a.awsSecretAccessKey", "<aws_secret>")
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.endpoint", "<aws_region>.amazonaws.com")
Run Code Online (Sandbox Code Playgroud)

祝你好运!