我有一个 HDFS 文件夹,其中包含两个 250MB 的 parquet 文件。hadoop df 块大小设置为 128MB。有以下代码:
JavaSparkContext sparkContext = new JavaSparkContext();
SQLContext sqlContext = new SQLContext(sparkContext);
DataFrame dataFrame = sqlContext.read().parquet("hdfs:////user/test/parquet-folder");
LOGGER.info("Nr. of rdd partitions: {}", dataFrame.rdd().getNumPartitions());
sparkContext.close();
Run Code Online (Sandbox Code Playgroud)
我使用spark.executor.instances=3 和spark.executor.cores=4 在集群上运行它。我可以看到 parquet 文件的读取分为 3 个执行程序 X 4 个核心 = 12 个任务:
spark.SparkContext: Starting job: parquet at VerySimpleJob.java:25
scheduler.DAGScheduler: Got job 0 (parquet at VerySimpleJob.java:25) with 12 output partitions
Run Code Online (Sandbox Code Playgroud)
但是,当我获取数据帧 RDD(或使用 toJavaRDD() 创建 RDD)调用时,我只得到 4 个分区。这是否由 hdfs 块大小控制 - 每个文件 2 个块,因此有 4 个分区?
为什么这与镶木地板(父级?)操作的分区数量不匹配?
我有一个镶木地板数据框,具有以下结构:
我必须将 480 个特征列中的每一列替换为其相应的加权移动平均值,窗口为 250。最初,我尝试使用以下简单代码对单个列执行此操作:
var data = sparkSession.read.parquet("s3://data-location")
var window = Window.rowsBetween(-250, Window.currentRow - 1).partitionBy("ID").orderBy("DATE")
data.withColumn("Feature_1", col("Feature_1").divide(avg("Feature_1").over(window))).write.parquet("s3://data-out")
Run Code Online (Sandbox Code Playgroud)
输入数据包含 2000 万行,每个 ID 大约有 4-5000 个关联日期。我已在 AWS EMR 集群(m4.xlarge 实例)上运行此程序,其中一列的结果如下:
我调整了以下设置,希望能缩短总时间:
第二个有助于防止日志中出现的一些溢出,但对实际性能没有任何帮助。
我不明白为什么仅仅 2000 万条记录就需要这么长时间。我知道,为了计算加权移动平均值,它需要执行 20 …
我有一个从csv文件读取的ML数据框。它包含三种类型的列:
ID时间戳记Feature1 Feature2 ... Feature_n
其中n是〜500(在ML术语中为500个特征)。数据集中的行总数约为1.6亿。
由于这是先前完全连接的结果,因此许多功能没有设置值。
我的目标是运行一个“填充”功能(fillna样式为python pandas),其中每个空功能值都将根据ID和Date设置为该列的先前可用值。
我正在尝试通过以下spark 2.2.1代码实现这一目标:
val rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)
val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(-50000, -1)
val columns = Array(...) //first 30 columns initially, just to see it working
val rawDataSetFilled = columns.foldLeft(rawDataset) { (originalDF, columnToFill) =>
originalDF.withColumn(columnToFill, coalesce(col(columnToFill), last(col(columnToFill), ignoreNulls = true).over(window)))
}
Run Code Online (Sandbox Code Playgroud)
我正在Spark 2.2.1上的Amazon EMR的4 m4.large实例上运行此作业。并启用了动态分配。
作业运行了2小时以上而未完成。
我在代码级别上做错了吗?给定数据和实例的大小,我认为它应该在合理的时间内完成?而且我什至没有尝试使用全部500列,只有30列!
在容器日志中,我看到的都是许多这样的日志:
INFO codegen.CodeGenerator:166.677493毫秒内生成的代码
INFO execute.ExternalAppendOnlyUnsafeRowArray:已达到4096行的溢出阈值,切换到org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
我尝试将参数spark.sql.windowExec.buffer.spill.threshold设置为更大的值,而没有任何影响。还有其他我应该知道的设置吗?那两条线是我在任何容器日志中看到的唯一一行。
在Ganglia中,我看到大多数CPU内核在完全使用情况附近达到峰值,但是内存使用情况低于可用的最大值。所有执行者均已分配并正在工作。
我有一个时间序列定价数据的数据框,带有 ID、日期和价格。
我需要计算价格列的指数移动平均线,并将其作为新列添加到数据框中。
我之前一直在使用 Spark 的窗口函数,它看起来很适合这个用例,但考虑到 EMA 的公式:
EMA: {Price - EMA(previous day)} x multiplier + EMA(previous day)
Run Code Online (Sandbox Code Playgroud)
在哪里
multiplier = (2 / (Time periods + 1)) //let's assume Time period is 10 days for now
Run Code Online (Sandbox Code Playgroud)
我对如何访问列中的先前计算值感到有些困惑,同时实际上对列进行了窗口化。使用简单的移动平均线,这很简单,因为您需要做的就是在平均窗口中的元素的同时计算一个新列:
var window = Window.partitionBy("ID").orderBy("Date").rowsBetween(-windowSize, Window.currentRow)
dataFrame.withColumn(avg(col("Price")).over(window).alias("SMA"))
Run Code Online (Sandbox Code Playgroud)
但似乎 EMA 有点复杂,因为在每一步我都需要以前的计算值。
我还查看了 Pyspark中的加权移动平均线,但我需要一种适用于 Spark/Scala 和 10 天或 30 天 EMA 的方法。
有任何想法吗?