mcu*_*ere 6 apache-spark apache-spark-sql
我正在尝试将存储在S3中的数据作为JSON-per-line文本文件转换为结构化的柱状格式,如ORC或Parquet on S3.
源文件包含多个方案的数据(例如,HTTP请求,HTTP响应......),需要将其解析为正确类型的不同Spark数据帧.
示例模式:
val Request = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("requestMethod", StringType),
StructField("scheme", StringType),
StructField("host", StringType),
StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
StructField("path", StringType),
StructField("sessionId", StringType),
StructField("userAgent", StringType)
))
val Response = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("contentType", StringType),
StructField("contentLength", IntegerType),
StructField("statusCode", StringType),
StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
StructField("responseDuration", DoubleType),
StructField("sessionId", StringType)
))
Run Code Online (Sandbox Code Playgroud)
我得到了那个部分工作正常,但是试图尽可能有效地将数据写回S3似乎是一个问题atm.
我尝试了3种方法:
在第一种情况下,JVM内存不足,而在第二种情况下,机器的磁盘空间不足.
第三个我还没有经过彻底的测试,但这似乎并不能有效地利用处理能力(因为只有一个集群节点(这个特定分区所在的节点)实际上会将数据写回S3) .
相关代码:
val allSchemes = Schemes.all().keys.toArray
if (false) {
import com.realo.warehouse.multiplex.implicits._
val input = readRawFromS3(inputPrefix) // returns RDD[Row]
.flatMuxPartitions(allSchemes.length, data => {
val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] }
data.foreach {
logItem => {
val schemeIndex = allSchemes.indexOf(logItem.logType)
if (schemeIndex > -1) {
buffers(schemeIndex).append(logItem.row)
}
}
}
buffers
})
allSchemes.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = input(index)
writeColumnarToS3(rdd, schemeName)
}
} else if (false) {
// Naive approach
val input = readRawFromS3(inputPrefix) // returns RDD[Row]
.persist(StorageLevel.MEMORY_AND_DISK)
allSchemes.foreach {
schemeName =>
val rdd = input
.filter(x => x.logType == schemeName)
.map(x => x.row)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
} else {
class CustomPartitioner extends Partitioner {
override def numPartitions: Int = allSchemes.length
override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String])
}
val input = readRawFromS3(inputPrefix)
.map(x => (x.logType, x.row))
.partitionBy(new CustomPartitioner())
.map { case (logType, row) => row }
.persist(StorageLevel.MEMORY_AND_DISK)
allSchemes.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = input
.mapPartitionsWithIndex(
(i, iter) => if (i == index) iter else Iterator.empty,
preservesPartitioning = true
)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
}
Run Code Online (Sandbox Code Playgroud)
从概念上讲,我认为代码应该每个方案类型有1个输出DStream,输入RDD应该选择'n'将每个处理过的项放到正确的DStream上(批处理以获得更好的吞吐量).
有没有人有任何关于如何实现这一点的指示?和/或有没有更好的方法来解决这个问题?
这就是我最终想到的:
我使用自定义分区器根据其方案加上行的哈希码对数据进行分区。
这里的原因是我们希望能够只处理某些分区,但仍然允许所有节点参与(出于性能原因)。因此,我们不会将数据分布在 1 个分区上,而是分布在 X 个分区上(在本示例中,X 是节点数乘以 2)。
然后对于每个方案,我们修剪不需要的分区,因此我们只会处理我们需要的分区。
代码示例:
def process(date : ReadableInstant, schemesToProcess : Array[String]) = {
// Tweak this based on your use case
val DefaultNumberOfStoragePartitions = spark.sparkContext.defaultParallelism * 2
class CustomPartitioner extends Partitioner {
override def numPartitions: Int = schemesToProcess.length * DefaultNumberOfStoragePartitions
override def getPartition(key: Any): Int = {
// This is tightly coupled with how `input` gets transformed below
val (logType, rowHashCode) = key.asInstanceOf[(String, Int)]
(schemesToProcess.indexOf(logType) * DefaultNumberOfStoragePartitions) + Utils.nonNegativeMod(rowHashCode, DefaultNumberOfStoragePartitions)
}
/**
* Internal helper function to retrieve all partition indices for the given key
* @param key input key
* @return
*/
private def getPartitions(key: String): Seq[Int] = {
val index = schemesToProcess.indexOf(key) * DefaultNumberOfStoragePartitions
index until (index + DefaultNumberOfStoragePartitions)
}
/**
* Returns an RDD which only traverses the partitions for the given key
* @param rdd base RDD
* @param key input key
* @return
*/
def filterRDDForKey[T](rdd: RDD[T], key: String): RDD[T] = {
val partitions = getPartitions(key).toSet
PartitionPruningRDD.create(rdd, x => partitions.contains(x))
}
}
val partitioner = new CustomPartitioner()
val input = readRawFromS3(date)
.map(x => ((x.logType, x.row.hashCode), x.row))
.partitionBy(partitioner)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Initial stage: caches the processed data + gets an enumeration of all schemes in this RDD
val schemesInRdd = input
.map(_._1._1)
.distinct()
.collect()
// Remaining stages: for each scheme, write it out to S3 as ORC
schemesInRdd.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = partitioner.filterRDDForKey(input, schemeName)
.map(_._2)
.coalesce(DefaultNumberOfStoragePartitions)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
313 次 |
| 最近记录: |