因此,在面向对象的世界中花费了多年的代码重用,设计模式和最佳实践总是被考虑在内,我发现自己在Spark世界中的代码组织和代码重用方面有些挣扎.
如果我尝试以可重用的方式编写代码,它几乎总是带来性能成本,我最终会将其重写为适合我的特定用例的最佳代码.这个常量"为这个特定用例编写最佳内容"也会影响代码组织,因为当"它们真的属于一个整体"时,将代码拆分成不同的对象或模块是困难的,因此我最终只得到很少的"上帝"对象包含长复杂变换链.事实上,我经常认为,如果我在面向对象世界工作时看到我现在正在写的大部分Spark代码,我会畏缩并将其视为"意大利面条代码".
我上网试图找到某种等同于面向对象世界的最佳实践,但没有太多运气.我可以找到一些函数式编程的"最佳实践",但Spark只增加了一个额外的层,因为性能是这里的一个主要因素.
所以我的问题是,你们中的任何人都有Spark专家发现了一些你可以推荐的编写Spark代码的最佳实践吗?
编辑
正如评论中所写,我实际上并没有希望有人就如何解决这个问题发表答案,而是我希望这个社区中的某个人遇到一些Martin Fowler类型,他曾在某处写过som文章或博客帖子关于如何解决Spark世界中代码组织的问题.
@DanielDarabos建议我举一个代码组织和性能相互矛盾的例子.虽然我发现我在日常工作中经常遇到这方面的问题,但我觉得把它归结为一个很好的最小例子有点困难;)但我会尝试.
在面向对象的世界里,我是单一责任原则的忠实粉丝,所以我要确保我的方法只对一件事负责.它使它们可重复使用并且易于测试.因此,如果我不得不计算列表中某些数字的总和(匹配某些标准)并且我必须计算相同数字的平均值,我肯定会创建两个方法 - 一个计算总和,一个计算平均值.像这样:
def main(implicit args: Array[String]): Unit = {
val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))
println("Summed weights for DK = " + summedWeights(list, "DK")
println("Averaged weights for DK = " + averagedWeights(list, "DK")
}
def summedWeights(list: List, country: String): Double = {
list.filter(_._1 == country).map(_._2).sum
}
def averagedWeights(list: List, country: String): Double = {
val filteredByCountry = list.filter(_._1 == country)
filteredByCountry.map(_._2).sum/ filteredByCountry.length
}
Run Code Online (Sandbox Code Playgroud)
我当然可以继续尊重Spark中的SRP:
def main(implicit …Run Code Online (Sandbox Code Playgroud) 我总是reduceByKey在需要在RDD中对数据进行分组时使用,因为它在对数据进行混洗之前执行地图侧减少,这通常意味着更少的数据被改组,因此我获得了更好的性能.即使地图侧缩减功能收集所有值并且实际上并没有减少数据量,我仍然使用reduceByKey,因为我假设性能reduceByKey永远不会差groupByKey.但是,我想知道这个假设是否正确,或者确实存在groupByKey应该首选的情况?
我有一个DataFrame我需要根据特定的分区写入S3.代码如下所示:
dataframe
.write
.mode(SaveMode.Append)
.partitionBy("year", "month", "date", "country", "predicate")
.parquet(outputPath)
Run Code Online (Sandbox Code Playgroud)
将partitionBy数据拆分成相当多的文件夹(~400),每个文件夹只有一点点数据(~1GB).这就出现了问题 - 因为默认值为spark.sql.shuffle.partitions200,每个文件夹中的1GB数据被分成200个小的镶木地板文件,导致总共写入大约80000个镶木地板文件.由于多种原因,这不是最佳的,我想避免这种情况.
我当然可以设置spark.sql.shuffle.partitions一个更小的数字,比如说10,但据我所知,这个设置也控制了连接和聚合中shuffle的分区数,所以我真的不想改变它.
有谁知道是否有另一种方法来控制写入多少文件?
我试图用spark处理kinesis流数据.我用2个分片开始我的流.它工作正常,直到我手动拆分我的一个分片.之后,我的程序在新数据到达时崩溃.
这是错误消息:
ERROR ShutdownTask: Application exception.
java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-000000000001
Run Code Online (Sandbox Code Playgroud)
我该如何在程序中处理已关闭分片的检查点?
编辑:添加示例代码到这个问题(这也与我高度相关).
示例代码:
/* Create a streaming context */
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(2) )
ssc.checkpoint(checkpointDirectory)
val kinesisStreams = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl,awsRegion,InitialPositionInStream.LATEST,Seconds(1),StorageLevel.MEMORY_ONLY)
/* Do the processing */
kinesisStreams.foreachRDD(rdd => ...)
ssc
}
/* First, recover the context; otherwise, create a new context */
val ssc = StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext _ )
ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud) 我的行RDD看起来像这样:
Array[org.apache.spark.sql.Row] = Array([1,[example1,WrappedArray([**Standford,Organisation,NNP], [is,O,VP], [good,LOCATION,ADP**])]])
Run Code Online (Sandbox Code Playgroud)
我从转换数据帧到rdd得到了这个,数据帧架构是:
root
|-- article_id: long (nullable = true)
|-- sentence: struct (nullable = true)
| |-- sentence: string (nullable = true)
| |-- attributes: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- tokens: string (nullable = true)
| | | |-- ner: string (nullable = true)
| | | |-- pos: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
现在如何在行rdd中访问元素,在数据帧中我可以使用df.select("sentence").我期待访问像stanford /其他嵌套元素的元素.