如何制作良好的可重现的Apache Spark示例

pau*_*ult 55 dataframe apache-spark apache-spark-sql pyspark pyspark-sql

我花了相当多的时间阅读标签的一些问题,而且我经常发现海报没有提供足够的信息来真正理解他们的问题.我经常评论要求他们发布MCVE,但有时让他们显示一些样本输入/输出数据就像拔牙一样.例如:请参阅有关此问题的评论.

也许问题的一部分是人们只是不知道如何轻松地为火花数据帧创建MCVE.我认为将这个pandas问题的spark-dataframe版本作为可以链接的指南是有用的.

那么如何创造一个好的,可重复的例子呢?

pau*_*ult 50

提供可轻松重新创建的小样本数据.

至少,海报应在其数据框和代码上提供几行和列,以便轻松创建它.简单来说,我的意思是剪切和粘贴.尽可能小,以证明您的问题.


我有以下数据帧:

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+
Run Code Online (Sandbox Code Playgroud)

可以使用以下代码创建:

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)
Run Code Online (Sandbox Code Playgroud)

显示所需的输出.

询问您的具体问题并向我们展示您想要的输出.


如何创建一个新列 'is_divisible' 具有价值 'yes' ,如果当月的天 'date' 加7天整除值列'X', 'no' 以其他方式?

期望的输出:

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+
Run Code Online (Sandbox Code Playgroud)

解释如何获得输出.

详细解释如何获得所需的输出.它有助于显示示例计算.


例如,在第1行中,X = 1且日期= 2017-01-01.添加7天到目前为止2017-01-08.这个月的日期是8,因为8可以被1整除,答案是"是".

同样,对于最后一行X = 7和日期= 2017-01-04.将7添加到日期会产生11作为该月的日期.由于11%7不是0,答案是"不".


分享您现有的代码.

向我们展示您已完成或尝试过的内容,包括所有*代码,即使它不起作用.告诉我们您遇到的问题,如果收到错误,请提供错误消息.

(*您可以省略代码来创建spark上下文,但是您应该包括所有导入.)


我知道如何添加一个 加7天的新列,date 但是我无法将整个月的日期作为整数.

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))
Run Code Online (Sandbox Code Playgroud)

包括版本,导入和使用语法突出显示


对于性能调优帖子,请包括执行计划


解析火花输出文件

  • MaxU此答案中提供了有用的代码,以帮助将Spark输出文件解析为DataFrame.

其他说明.


hi-*_*zir 21

性能调整

如果问题与性能调整有关,请包含以下信息.

执行计划

最好包括扩展执行计划.在Python中:

df.explain(True) 
Run Code Online (Sandbox Code Playgroud)

在斯卡拉:

df.explain(true)
Run Code Online (Sandbox Code Playgroud)

或者扩展了统计数据的执行计划.在Python中:

print(df._jdf.queryExecution().stringWithStats())
Run Code Online (Sandbox Code Playgroud)

在斯卡拉:

df.queryExecution.stringWithStats
Run Code Online (Sandbox Code Playgroud)

模式和群集信息

  • mode- local,client`集群.
  • 集群管理器(如果适用) - 无(本地模式),独立,YARN,Mesos,Kubernetes.
  • 基本配置信息(内核数,执行程序内存).

时间信息

是相对的,特别是当您移植非分布式应用程序或您期望低延迟时.可以从Spark UI(sc.uiWebUrl)jobs或Spark REST UI 检索不同任务和阶段的确切时间.

使用标准名称作为上下文

使用每个上下文的已建立名称可以让我们快速重现问题.

  • sc- 为SparkContext.
  • sqlContext- 为SQLContext.
  • spark- 为SparkSession.

提供类型信息(Scala)

强大的类型推断是Scala最有用的功能之一,但它很难分析脱离上下文的代码.即使从上下文中显而易见的类型,最好注释变量.比较喜欢

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))
Run Code Online (Sandbox Code Playgroud)

过度

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))
Run Code Online (Sandbox Code Playgroud)

常用工具可以帮助您:

  • spark-shell/Scala shell

    使用 :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
    
    Run Code Online (Sandbox Code Playgroud)
  • InteliJ Idea

    使用Alt+=


des*_*aut 15

好问答 一些额外的建议:

包括您的Spark版本

Spark仍然在不断发展,尽管没有1.x时那么快.它总是(但特别是如果你使用的是较旧的版本)一个好主意,包括你的工作版本.就个人而言,我总是从我的答案有:

spark.version
# u'2.2.0'
Run Code Online (Sandbox Code Playgroud)

要么

sc.version
# u'2.2.0'
Run Code Online (Sandbox Code Playgroud)

包括你的Python版本也不是一个坏主意.


包括所有进口

如果您的问题不是严格关于Spark SQL和数据框架,例如,如果您打算在某些机器学习操作中使用您的数据框,请明确您的导入 - 请参阅此问题,只有在广泛交换之后才在OP中添加导入(现已删除)评论(并且结果证明这些错误的导入是问题的根本原因).

为什么这有必要?因为,例如,这个LDA

from pyspark.mllib.clustering import LDA
Run Code Online (Sandbox Code Playgroud)

不同的,从这个LDA:

from pyspark.ml.clustering import LDA
Run Code Online (Sandbox Code Playgroud)

第一个来自旧的,基于RDD的API(以前称为Spark MLlib),而第二个来自新的基于数据帧的API(Spark ML).


包括代码突出显示

好吧,我承认这是主观的:我相信PySpark的问题不应该被python 默认标记为; python事实上,tag会自动代码突出显示(我相信这是使用PySpark问题的人的主要原因).无论如何,如果你碰巧同意,并且你仍然想要一个很好的,突出显示的代码,只需包含相关的markdown指令:

<!-- language-all: lang-python -->

您的帖子中的某个位置,在您的第一个代码段之前.

[更新:我已经请求自动语法高亮显示pysparksparkr标签 - 最受欢迎的upvotes]


Max*_*axU 12

这个小帮助函数可能有助于将Spark输出文件解析为DataFrame:

PySpark:

from pyspark.sql.functions import *

def read_spark_output(file_path):
    step1 = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file://{}".format(file_path))
    # select not-null columns
    step2 = t.select([c for c in t.columns if not c.startswith("_")])
    # deal with 'null' string in column
    return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])
Run Code Online (Sandbox Code Playgroud)

斯卡拉:

// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}
Run Code Online (Sandbox Code Playgroud)

用法:

df = read_spark_output("file:///tmp/spark.out")
Run Code Online (Sandbox Code Playgroud)

PS:对于pyspark,eqNullSafe可以从spark 2.3.