标签: spark-dataframe

根据worker,core和DataFrame大小确定Spark分区的最佳数量

Spark-land中有几个相似但又不同的概念,围绕着如何将工作分配到不同的节点并同时执行.具体来说,有:

  • Spark Driver节点(sparkDriverCount)
  • Spark群集可用的工作节点数(numWorkerNodes)
  • Spark执行器的数量(numExecutors)
  • 由所有工人/执行者同时操作的DataFrame(dataFrame)
  • dataFrame(numDFRows)中的行数
  • dataFrame(numPartitions)上的分区数
  • 最后,每个工作节点上可用的CPU核心数量(numCpuCoresPerWorker)

相信所有Spark集群都有一个且只有一个 Spark Driver,然后是0+个工作节点.如果我错了,请先纠正我!假设我或多或少是正确的,让我们在这里锁定一些变量.假设我们有一个带有1个驱动程序和4个工作节点的Spark集群,每个工作节点上有4个CPU核心(因此总共有16个CPU核心).所以这里的"给定"是:

sparkDriverCount = 1
numWorkerNodes = 4
numCpuCores = numWorkerNodes * numCpuCoresPerWorker = 4 * 4 = 16
Run Code Online (Sandbox Code Playgroud)

鉴于作为设置,我想知道如何确定一些事情.特别:

  • numWorkerNodes和之间有什么关系numExecutors?是否有一些已知/普遍接受的工人与遗嘱执行人的比例?有没有办法确定numExecutors给定numWorkerNodes(或任何其他输入)?
  • 是否已知/普遍接受/最佳比率numDFRowsnumPartitions?如何根据dataFrame?的大小计算"最佳"分区数?
  • 我从其他工程师那里得知,一般的"经验法则"是:numPartitions = numWorkerNodes * numCpuCoresPerWorker那有什么道理吗?换句话说,它规定每个CPU核心应该有一个分区.

partitioning distributed-computing bigdata apache-spark spark-dataframe

20
推荐指数
1
解决办法
1万
查看次数

如何在Spark窗口函数中使用降序的orderby()?

我需要一个窗口函数,它按一些键(=列名称)进行分区,按另一个列名称进行排序,并返回前x行的行.

这适用于升序:

def getTopX(df: DataFrame, top_x: String, top_key: String, top_value:String): DataFrame ={
    val top_keys: List[String] = top_key.split(", ").map(_.trim).toList
    val w = Window.partitionBy(top_keys(1),top_keys.drop(1):_*)
       .orderBy(top_value)
    val rankCondition = "rn < "+top_x.toString
    val dfTop = df.withColumn("rn",row_number().over(w))
      .where(rankCondition).drop("rn")
  return dfTop
}
Run Code Online (Sandbox Code Playgroud)

但是当我尝试将其更改为第4行orderBy(desc(top_value))orderBy(top_value.desc)第4行时,我收到语法错误.这里的语法是什么?

scala apache-spark apache-spark-sql spark-dataframe

19
推荐指数
2
解决办法
4万
查看次数

如何在Apache Spark中处理更改镶木地板模式

我遇到了一个问题,我将Parquet数据作为S3中的每日块(以形式s3://bucketName/prefix/YYYY/MM/DD/)但我无法从不同的日期读取AWS EMR Spark中的数据,因为某些列类型不匹配,我得到许多异常中的一个,例如:

java.lang.ClassCastException: optional binary element (UTF8) is not a group
Run Code Online (Sandbox Code Playgroud)

在某些文件中出现的数组类型具有值,但同一列可能null在其他文件中具有值,然后将其推断为String类型.

要么

org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal):
org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true)
Run Code Online (Sandbox Code Playgroud)

我在S3中以JSON格式存在原始数据,我最初的计划是创建一个自动作业,启动一个EMR集群,读取前一个日期的JSON数据,然后将其作为镶木地板写回S3.

JSON数据也分为日期,即键具有日期前缀.阅读JSON工作正常.无论当前正在读取多少数据,都可以从数据中推断出模式.

但是当编写镶木地板文件时问题就会出现.据我所知,当我使用元数据文件编写镶木地板时,这些文件包含镶木地板文件的所有零件/分区的模式.对我而言,似乎也可以使用不同的模式.当我禁用写入元数据时,据说Spark从给定Parquet路径中的第一个文件推断整个模式,并假设它通过其他文件保持不变.

当一些应该是double类型的列只有给定日期的整数值时,从JSON读取它们(它们将这些数字作为整数,没有浮点数)使得Spark认为它是一个具有类型的列long.即使我可以在编写Parquet文件之前将这些列转换为double,但这仍然不好,因为架构可能会更改,可以添加新列,并且无法跟踪此列.

我看到有些人有同样的问题,但我还没有找到一个足够好的解决方案.

有什么最佳实践或解决方案?

emr apache-spark parquet apache-spark-sql spark-dataframe

19
推荐指数
2
解决办法
1万
查看次数

Spark DataFrame中向量的访问元素(Logistic回归概率向量)

我在PySpark(ML包)中训练了LogisticRegression模型,预测结果是PySpark DataFrame(cv_predictions)(参见[1]).该probability列(见[2])是一种vector类型(见[3]).

[1]
type(cv_predictions_prod)
pyspark.sql.dataframe.DataFrame

[2]
cv_predictions_prod.select('probability').show(10, False)
+----------------------------------------+
|probability                             |
+----------------------------------------+
|[0.31559134817066054,0.6844086518293395]|
|[0.8937864350711228,0.10621356492887715]|
|[0.8615878905395029,0.1384121094604972] |
|[0.9594427633777901,0.04055723662220989]|
|[0.5391547673698157,0.46084523263018434]|
|[0.2820729747752462,0.7179270252247538] |
|[0.7730465873083118,0.22695341269168817]|
|[0.6346585276598942,0.3653414723401058] |
|[0.6346585276598942,0.3653414723401058] |
|[0.637279255218404,0.362720744781596]   |
+----------------------------------------+
only showing top 10 rows

[3]
cv_predictions_prod.printSchema()
root
 ...
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

如何创建解析vectorPySpark DataFrame,以便创建一个新列,只拉取每个probability向量的第一个元素?

这个问题类似于,但下面链接中的解决方案不起作用/我不清楚:

如何在PySpark中访问denseVector的值

如何访问Spark DataFrame中VectorUDT列的元素?

python apache-spark pyspark spark-dataframe apache-spark-ml

19
推荐指数
1
解决办法
1万
查看次数

使用DataFrame按组分组的Python Spark累积和

如何使用以下方法计算每组的累积总和DataFrame abstraction; 在PySpark

使用示例数据集如下:

df = sqlContext.createDataFrame( [(1,2,"a"),(3,2,"a"),(1,3,"b"),(2,2,"a"),(2,3,"b")], 
                                 ["time", "value", "class"] )

+----+-----+-----+
|time|value|class|
+----+-----+-----+
|   1|    2|    a|
|   3|    2|    a|
|   1|    3|    b|
|   2|    2|    a|
|   2|    3|    b|
+----+-----+-----+
Run Code Online (Sandbox Code Playgroud)

我想valueclass(有序)time变量添加每个分组的累积和列.

apache-spark pyspark spark-dataframe

19
推荐指数
3
解决办法
1万
查看次数

Spark - 阅读时如何跳过或忽略空的gzip文件

我有几百个文件夹,每个文件夹有成千上万的gzip文本文件,我试图将它们读入数据框中spark.read.csv().

在这些文件中,有些文件的长度为零,导致错误:

java.io.EOFException:输入流的意外结束

码:

df = spark.read.csv('s3n://my-bucket/folder*/logfiles*.log.gz',sep='\t',schema=schema)
Run Code Online (Sandbox Code Playgroud)

我已经尝试设置modeDROPMALFORMED与阅读sc.textFile(),但没有运气.

处理空的或损坏的gzip文件的最佳方法是什么?

pyspark spark-dataframe pyspark-sql

18
推荐指数
1
解决办法
4386
查看次数

如何爆炸列?

后:

val df = Seq((1, Vector(2, 3, 4)), (1, Vector(2, 3, 4))).toDF("Col1", "Col2")
Run Code Online (Sandbox Code Playgroud)

我在Apache Spark中有这个DataFrame:

+------+---------+
| Col1 | Col2    |
+------+---------+
|  1   |[2, 3, 4]|
|  1   |[2, 3, 4]|
+------+---------+
Run Code Online (Sandbox Code Playgroud)

我如何将其转换为:

+------+------+------+------+
| Col1 | Col2 | Col3 | Col4 |
+------+------+------+------+
|  1   |  2   |  3   |  4   |
|  1   |  2   |  3   |  4   |
+------+------+------+------+
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark spark-dataframe

17
推荐指数
1
解决办法
2万
查看次数

pyspark:ValueError:推断后无法确定某些类型

我有一个熊猫数据框my_df,并my_df.dtypes给我们:

ts              int64
fieldA         object
fieldB         object
fieldC         object
fieldD         object
fieldE         object
dtype: object
Run Code Online (Sandbox Code Playgroud)

然后我尝试通过以下操作将pandas数据帧my_df转换为spark数据框:

spark_my_df = sc.createDataFrame(my_df)
Run Code Online (Sandbox Code Playgroud)

但是,我收到以下错误:

ValueErrorTraceback (most recent call last)
<ipython-input-29-d4c9bb41bb1e> in <module>()
----> 1 spark_my_df = sc.createDataFrame(my_df)
      2 spark_my_df.take(20)

/usr/local/spark-latest/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio)
    520             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    521         else:
--> 522             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    523         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    524         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/usr/local/spark-latest/python/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    384 
    385 …
Run Code Online (Sandbox Code Playgroud)

python python-2.7 pandas pyspark spark-dataframe

17
推荐指数
4
解决办法
1万
查看次数

如何比较scala中不同的两个数据框和打印列

我们这里有两个数据框:

预期的数据帧:

+------+---------+--------+----------+-------+--------+
|emp_id| emp_city|emp_name| emp_phone|emp_sal|emp_site|
+------+---------+--------+----------+-------+--------+
|     3|  Chennai|  rahman|9848022330|  45000|SanRamon|
|     1|Hyderabad|     ram|9848022338|  50000|      SF|
|     2|Hyderabad|   robin|9848022339|  40000|      LA|
|     4|  sanjose|   romin|9848022331|  45123|SanRamon|
+------+---------+--------+----------+-------+--------+
Run Code Online (Sandbox Code Playgroud)

和实际数据框:

+------+---------+--------+----------+-------+--------+
|emp_id| emp_city|emp_name| emp_phone|emp_sal|emp_site|
+------+---------+--------+----------+-------+--------+
|     3|  Chennai|  rahman|9848022330|  45000|SanRamon|
|     1|Hyderabad|     ram|9848022338|  50000|      SF|
|     2|Hyderabad|   robin|9848022339|  40000|      LA|
|     4|  sanjose|  romino|9848022331|  45123|SanRamon|
+------+---------+--------+----------+-------+--------+
Run Code Online (Sandbox Code Playgroud)

两个数据帧之间的区别现在是:

+------+--------+--------+----------+-------+--------+
|emp_id|emp_city|emp_name| emp_phone|emp_sal|emp_site|
+------+--------+--------+----------+-------+--------+
|     4| sanjose|  romino|9848022331|  45123|SanRamon|
+------+--------+--------+----------+-------+--------+
Run Code Online (Sandbox Code Playgroud)

我们使用的是except函数df1.except(df2),但问题是,它返回的是不同的整行.我们想要的是查看该行中哪些列是不同的(在这种情况下,"romin"和"emp_name"中的"romino"不同).我们遇到了巨大的困难,任何帮助都会很棒.

compare scala bigdata apache-spark spark-dataframe

17
推荐指数
1
解决办法
3万
查看次数

迭代Spark数据帧中的行和列

我有以下动态创建的Spark数据帧:

val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)

val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)

val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)

val data = Seq(row1,row2,row3)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
Run Code Online (Sandbox Code Playgroud)

现在,我需要迭代每一行和sqlDF每列打印每一列,这是我的尝试:

sqlDF.foreach { row =>
  row.foreach { col => println(col) }
}
Run Code Online (Sandbox Code Playgroud)

row是类型Row,但不可迭代,这就是为什么此代码抛出编译错误 …

scala apache-spark apache-spark-sql spark-dataframe

17
推荐指数
4
解决办法
5万
查看次数