我正在尝试使用Apache Spark SQL将S3中的json日志数据同步到Parquet文件中.我的代码基本上是:
import org.apache.spark._
val sqlContext = sql.SQLContext(sc)
val data = sqlContext.jsonFile("s3n://...", 10e-6)
data.saveAsParquetFile("s3n://...")
Run Code Online (Sandbox Code Playgroud)
当我有多达2000个分区并且5000或更高失败时,无论数据量如何,此代码都有效.通常情况下,人们可以将分区合并到一个可接受的数字,但这是一个非常大的数据集,在2000个分区我遇到了这个问题所描述的问题
14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ...
java.io.IOException: Could not read footer: java.lang.NullPointerException
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190)
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203)
at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49)
at …Run Code Online (Sandbox Code Playgroud) 试过这个:
import boto3
from boto3.s3.transfer import TransferConfig, S3Transfer
path = "/temp/"
fileName = "bigFile.gz" # this happens to be a 5.9 Gig file
client = boto3.client('s3', region)
config = TransferConfig(
multipart_threshold=4*1024, # number of bytes
max_concurrency=10,
num_download_attempts=10,
)
transfer = S3Transfer(client, config)
transfer.upload_file(path+fileName, 'bucket', 'key')
Run Code Online (Sandbox Code Playgroud)
结果:s3上的5.9 gig文件.似乎不包含多个部分.
我找到了这个例子,但part没有定义.
import boto3
bucket = 'bucket'
path = "/temp/"
fileName = "bigFile.gz"
key = 'key'
s3 = boto3.client('s3')
# Initiate the multipart upload and send the part(s)
mpu = …Run Code Online (Sandbox Code Playgroud)