小编Gle*_*olt的帖子

Spark代码组织和最佳实践

因此,在面向对象的世界中花费了多年的代码重用,设计模式和最佳实践总是被考虑在内,我发现自己在Spark世界中的代码组织和代码重用方面有些挣扎.

如果我尝试以可重用的方式编写代码,它几乎总是带来性能成本,我最终会将其重写为适合我的特定用例的最佳代码.这个常量"为这个特定用例编写最佳内容"也会影响代码组织,因为当"它们真的属于一个整体"时,将代码拆分成不同的对象或模块是困难的,因此我最终只得到很少的"上帝"对象包含长复杂变换链.事实上,我经常认为,如果我在面向对象世界工作时看到我现在正在写的大部分Spark代码,我会畏缩并将其视为"意大利面条代码".

我上网试图找到某种等同于面向对象世界的最佳实践,但没有太多运气.我可以找到一些函数式编程的"最佳实践",但Spark只增加了一个额外的层,因为性能是这里的一个主要因素.

所以我的问题是,你们中的任何人都有Spark专家发现了一些你可以推荐的编写Spark代码的最佳实践吗?

编辑

正如评论中所写,我实际上并没有希望有人就如何解决这个问题发表答案,而是我希望这个社区中的某个人遇到一些Martin Fowler类型,他曾在某处写过som文章或博客帖子关于如何解决Spark世界中代码组织的问题.

@DanielDarabos建议我举一个代码组织和性能相互矛盾的例子.虽然我发现我在日常工作中经常遇到这方面的问题,但我觉得把它归结为一个很好的最小例子有点困难;)但我会尝试.

在面向对象的世界里,我是单一责任原则的忠实粉丝,所以我要确保我的方法只对一件事负责.它使它们可重复使用并且易于测试.因此,如果我不得不计算列表中某些数字的总和(匹配某些标准)并且我必须计算相同数字的平均值,我肯定会创建两个方法 - 一个计算总和,一个计算平均值.像这样:

def main(implicit args: Array[String]): Unit = {
  val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))

  println("Summed weights for DK = " + summedWeights(list, "DK")
  println("Averaged weights for DK = " + averagedWeights(list, "DK")
}

def summedWeights(list: List, country: String): Double = {
  list.filter(_._1 == country).map(_._2).sum
}

def averagedWeights(list: List, country: String): Double = {
  val filteredByCountry = list.filter(_._1 == country) 
  filteredByCountry.map(_._2).sum/ filteredByCountry.length
}
Run Code Online (Sandbox Code Playgroud)

我当然可以继续尊重Spark中的SRP:

def main(implicit …
Run Code Online (Sandbox Code Playgroud)

functional-programming code-organization apache-spark

65
推荐指数
1
解决办法
6675
查看次数

groupByKey是否比reduceByKey更受欢迎

我总是reduceByKey在需要在RDD中对数据进行分组时使用,因为它在对数据进行混洗之前执行地图侧减少,这通常意味着更少的数据被改组,因此我获得了更好的性能.即使地图侧缩减功能收集所有值并且实际上并没有减少数据量,我仍然使用reduceByKey,因为我假设性能reduceByKey永远不会差groupByKey.但是,我想知道这个假设是否正确,或者确实存在groupByKey应该首选的情况?

apache-spark rdd

14
推荐指数
2
解决办法
1万
查看次数

如何控制使用partitionBy时生成的镶木地板文件数量

我有一个DataFrame我需要根据特定的分区写入S3.代码如下所示:

dataframe
  .write
  .mode(SaveMode.Append)
  .partitionBy("year", "month", "date", "country", "predicate")
  .parquet(outputPath)
Run Code Online (Sandbox Code Playgroud)

partitionBy数据拆分成相当多的文件夹(~400),每个文件夹只有一点点数据(~1GB).这就出现了问题 - 因为默认值为spark.sql.shuffle.partitions200,每个文件夹中的1GB数据被分成200个小的镶木地板文件,导致总共写入大约80000个镶木地板文件.由于多种原因,这不是最佳的,我想避免这种情况.

我当然可以设置spark.sql.shuffle.partitions一个更小的数字,比如说10,但据我所知,这个设置也控制了连接和聚合中shuffle的分区数,所以我真的不想改变它.

有谁知道是否有另一种方法来控制写入多少文件?

apache-spark spark-dataframe

8
推荐指数
1
解决办法
4886
查看次数

在kinesis流上运行火花时重新磨光

我试图用spark处理kinesis流数据.我用2个分片开始我的流.它工作正常,直到我手动拆分我的一个分片.之后,我的程序在新数据到达时崩溃.

这是错误消息:

ERROR ShutdownTask: Application exception.

java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-000000000001
Run Code Online (Sandbox Code Playgroud)

我该如何在程序中处理已关闭分片的检查点?

编辑:添加示例代码到这个问题(这也与我高度相关).

示例代码:

/* Create a streaming context */
def functionToCreateContext(): StreamingContext = {

  val ssc = new StreamingContext(sc, Seconds(2) )
  ssc.checkpoint(checkpointDirectory)

  val kinesisStreams = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl,awsRegion,InitialPositionInStream.LATEST,Seconds(1),StorageLevel.MEMORY_ONLY)

  /* Do the processing */
  kinesisStreams.foreachRDD(rdd => ...)

  ssc
}

/* First, recover the context; otherwise, create a new context */
val ssc = StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext _ )

ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

apache-spark amazon-kinesis spark-streaming

5
推荐指数
0
解决办法
385
查看次数

如何在SCALA中的Row RDD中访问elemens

我的行RDD看起来像这样:

Array[org.apache.spark.sql.Row] = Array([1,[example1,WrappedArray([**Standford,Organisation,NNP], [is,O,VP], [good,LOCATION,ADP**])]])
Run Code Online (Sandbox Code Playgroud)

我从转换数据帧到rdd得到了这个,数据帧架构是:

root
 |-- article_id: long (nullable = true)
 |-- sentence: struct (nullable = true)
 |    |-- sentence: string (nullable = true)
 |    |-- attributes: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- tokens: string (nullable = true)
 |    |    |    |-- ner: string (nullable = true)
 |    |    |    |-- pos: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

现在如何在行rdd中访问元素,在数据帧中我可以使用df.select("sentence").我期待访问像stanford /其他嵌套元素的元素.

scala apache-spark

5
推荐指数
1
解决办法
5271
查看次数