小编mcu*_*ere的帖子

将RDD解复用到多个ORC表上

我正在尝试将存储在S3中的数据作为JSON-per-line文本文件转换为结构化的柱状格式,如ORC或Parquet on S3.

源文件包含多个方案的数据(例如,HTTP请求,HTTP响应......),需要将其解析为正确类型的不同Spark数据帧.

示例模式:

  val Request = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("requestMethod", StringType),
    StructField("scheme", StringType),
    StructField("host", StringType),
    StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
    StructField("path", StringType),
    StructField("sessionId", StringType),
    StructField("userAgent", StringType)
  ))

  val Response = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("contentType", StringType),
    StructField("contentLength", IntegerType),
    StructField("statusCode", StringType),
    StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
    StructField("responseDuration", DoubleType),
    StructField("sessionId", StringType)
  ))
Run Code Online (Sandbox Code Playgroud)

我得到了那个部分工作正常,但是试图尽可能有效地将数据写回S3似乎是一个问题atm.

我尝试了3种方法:

  1. 来自silex项目的muxPartitions
  2. 缓存已解析的S3输入并多次循环它
  3. 使每个方案类型成为RDD的单独分区

在第一种情况下,JVM内存不足,而在第二种情况下,机器的磁盘空间不足.

第三个我还没有经过彻底的测试,但这似乎并不能有效地利用处理能力(因为只有一个集群节点(这个特定分区所在的节点)实际上会将数据写回S3) .

相关代码:

val allSchemes = Schemes.all().keys.toArray

if (false) {
  import com.realo.warehouse.multiplex.implicits._

  val input = readRawFromS3(inputPrefix) // returns …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

6
推荐指数
1
解决办法
313
查看次数

标签 统计

apache-spark ×1

apache-spark-sql ×1