将RDD解复用到多个ORC表上

mcu*_*ere 6 apache-spark apache-spark-sql

我正在尝试将存储在S3中的数据作为JSON-per-line文本文件转换为结构化的柱状格式,如ORC或Parquet on S3.

源文件包含多个方案的数据(例如,HTTP请求,HTTP响应......),需要将其解析为正确类型的不同Spark数据帧.

示例模式:

  val Request = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("requestMethod", StringType),
    StructField("scheme", StringType),
    StructField("host", StringType),
    StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
    StructField("path", StringType),
    StructField("sessionId", StringType),
    StructField("userAgent", StringType)
  ))

  val Response = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("contentType", StringType),
    StructField("contentLength", IntegerType),
    StructField("statusCode", StringType),
    StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
    StructField("responseDuration", DoubleType),
    StructField("sessionId", StringType)
  ))
Run Code Online (Sandbox Code Playgroud)

我得到了那个部分工作正常,但是试图尽可能有效地将数据写回S3似乎是一个问题atm.

我尝试了3种方法:

  1. 来自silex项目的muxPartitions
  2. 缓存已解析的S3输入并多次循环它
  3. 使每个方案类型成为RDD的单独分区

在第一种情况下,JVM内存不足,而在第二种情况下,机器的磁盘空间不足.

第三个我还没有经过彻底的测试,但这似乎并不能有效地利用处理能力(因为只有一个集群节点(这个特定分区所在的节点)实际上会将数据写回S3) .

相关代码:

val allSchemes = Schemes.all().keys.toArray

if (false) {
  import com.realo.warehouse.multiplex.implicits._

  val input = readRawFromS3(inputPrefix) // returns RDD[Row]
    .flatMuxPartitions(allSchemes.length, data => {
      val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] }
      data.foreach {
        logItem => {
          val schemeIndex = allSchemes.indexOf(logItem.logType)
          if (schemeIndex > -1) {
            buffers(schemeIndex).append(logItem.row)
          }
        }
      }
      buffers
    })

  allSchemes.zipWithIndex.foreach {
    case (schemeName, index) =>
      val rdd = input(index)

      writeColumnarToS3(rdd, schemeName)
  }
} else if (false) {
  // Naive approach
  val input = readRawFromS3(inputPrefix) // returns RDD[Row]
    .persist(StorageLevel.MEMORY_AND_DISK)

  allSchemes.foreach {
    schemeName =>
      val rdd = input
        .filter(x => x.logType == schemeName)
        .map(x => x.row)

      writeColumnarToS3(rdd, schemeName)
  }

  input.unpersist()
} else {
  class CustomPartitioner extends Partitioner {
    override def numPartitions: Int = allSchemes.length
    override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String])
  }

    val input = readRawFromS3(inputPrefix)
      .map(x => (x.logType, x.row))
      .partitionBy(new CustomPartitioner())
      .map { case (logType, row) => row }
      .persist(StorageLevel.MEMORY_AND_DISK)

    allSchemes.zipWithIndex.foreach {
      case (schemeName, index) =>
        val rdd = input
          .mapPartitionsWithIndex(
            (i, iter) => if (i == index) iter else Iterator.empty,
            preservesPartitioning = true
          )

        writeColumnarToS3(rdd, schemeName)
    }

    input.unpersist()
}
Run Code Online (Sandbox Code Playgroud)

从概念上讲,我认为代码应该每个方案类型有1个输出DStream,输入RDD应该选择'n'将每个处理过的项放到正确的DStream上(批处理以获得更好的吞吐量).

有没有人有任何关于如何实现这一点的指示?和/或有没有更好的方法来解决这个问题?

mcu*_*ere 0

这就是我最终想到的:

我使用自定义分区器根据其方案加上行的哈希码对数据进行分区。

这里的原因是我们希望能够只处理某些分区,但仍然允许所有节点参与(出于性能原因)。因此,我们不会将数据分布在 1 个分区上,而是分布在 X 个分区上(在本示例中,X 是节点数乘以 2)。

然后对于每个方案,我们修剪不需要的分区,因此我们只会处理我们需要的分区。

代码示例:

def process(date : ReadableInstant, schemesToProcess : Array[String]) = {
  // Tweak this based on your use case
  val DefaultNumberOfStoragePartitions = spark.sparkContext.defaultParallelism * 2

  class CustomPartitioner extends Partitioner {
    override def numPartitions: Int = schemesToProcess.length * DefaultNumberOfStoragePartitions
    override def getPartition(key: Any): Int = {
      // This is tightly coupled with how `input` gets transformed below
      val (logType, rowHashCode) = key.asInstanceOf[(String, Int)]
      (schemesToProcess.indexOf(logType) * DefaultNumberOfStoragePartitions) + Utils.nonNegativeMod(rowHashCode, DefaultNumberOfStoragePartitions)
    }

    /**
      * Internal helper function to retrieve all partition indices for the given key
      * @param key input key
      * @return
      */
    private def getPartitions(key: String): Seq[Int] = {
      val index = schemesToProcess.indexOf(key) * DefaultNumberOfStoragePartitions
      index until (index + DefaultNumberOfStoragePartitions)
    }

    /**
      * Returns an RDD which only traverses the partitions for the given key
      * @param rdd base RDD
      * @param key input key
      * @return
      */
    def filterRDDForKey[T](rdd: RDD[T], key: String): RDD[T] = {
      val partitions = getPartitions(key).toSet
      PartitionPruningRDD.create(rdd, x => partitions.contains(x))
    }
  }

  val partitioner = new CustomPartitioner()
  val input = readRawFromS3(date)
    .map(x => ((x.logType, x.row.hashCode), x.row))
    .partitionBy(partitioner)
    .persist(StorageLevel.MEMORY_AND_DISK_SER)

  // Initial stage: caches the processed data + gets an enumeration of all schemes in this RDD
  val schemesInRdd = input
    .map(_._1._1)
    .distinct()
    .collect()

  // Remaining stages: for each scheme, write it out to S3 as ORC
  schemesInRdd.zipWithIndex.foreach {
    case (schemeName, index) =>
      val rdd = partitioner.filterRDDForKey(input, schemeName)
        .map(_._2)
        .coalesce(DefaultNumberOfStoragePartitions)

      writeColumnarToS3(rdd, schemeName)
  }

  input.unpersist()
}
Run Code Online (Sandbox Code Playgroud)