Spark-land中有几个相似但又不同的概念,围绕着如何将工作分配到不同的节点并同时执行.具体来说,有:
sparkDriverCount)numWorkerNodes)numExecutors)dataFrame)dataFrame(numDFRows)中的行数dataFrame(numPartitions)上的分区数numCpuCoresPerWorker)我相信所有Spark集群都有一个且只有一个 Spark Driver,然后是0+个工作节点.如果我错了,请先纠正我!假设我或多或少是正确的,让我们在这里锁定一些变量.假设我们有一个带有1个驱动程序和4个工作节点的Spark集群,每个工作节点上有4个CPU核心(因此总共有16个CPU核心).所以这里的"给定"是:
sparkDriverCount = 1
numWorkerNodes = 4
numCpuCores = numWorkerNodes * numCpuCoresPerWorker = 4 * 4 = 16
Run Code Online (Sandbox Code Playgroud)
鉴于作为设置,我想知道如何确定一些事情.特别:
numWorkerNodes和之间有什么关系numExecutors?是否有一些已知/普遍接受的工人与遗嘱执行人的比例?有没有办法确定numExecutors给定numWorkerNodes(或任何其他输入)?numDFRows为numPartitions?如何根据dataFrame?的大小计算"最佳"分区数?numPartitions = numWorkerNodes * numCpuCoresPerWorker那有什么道理吗?换句话说,它规定每个CPU核心应该有一个分区.partitioning distributed-computing bigdata apache-spark spark-dataframe
我需要一个窗口函数,它按一些键(=列名称)进行分区,按另一个列名称进行排序,并返回前x行的行.
这适用于升序:
def getTopX(df: DataFrame, top_x: String, top_key: String, top_value:String): DataFrame ={
val top_keys: List[String] = top_key.split(", ").map(_.trim).toList
val w = Window.partitionBy(top_keys(1),top_keys.drop(1):_*)
.orderBy(top_value)
val rankCondition = "rn < "+top_x.toString
val dfTop = df.withColumn("rn",row_number().over(w))
.where(rankCondition).drop("rn")
return dfTop
}
Run Code Online (Sandbox Code Playgroud)
但是当我尝试将其更改为第4行orderBy(desc(top_value))或orderBy(top_value.desc)第4行时,我收到语法错误.这里的语法是什么?
我遇到了一个问题,我将Parquet数据作为S3中的每日块(以形式s3://bucketName/prefix/YYYY/MM/DD/)但我无法从不同的日期读取AWS EMR Spark中的数据,因为某些列类型不匹配,我得到许多异常中的一个,例如:
java.lang.ClassCastException: optional binary element (UTF8) is not a group
Run Code Online (Sandbox Code Playgroud)
在某些文件中出现的数组类型具有值,但同一列可能null在其他文件中具有值,然后将其推断为String类型.
要么
org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal):
org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true)
Run Code Online (Sandbox Code Playgroud)
我在S3中以JSON格式存在原始数据,我最初的计划是创建一个自动作业,启动一个EMR集群,读取前一个日期的JSON数据,然后将其作为镶木地板写回S3.
JSON数据也分为日期,即键具有日期前缀.阅读JSON工作正常.无论当前正在读取多少数据,都可以从数据中推断出模式.
但是当编写镶木地板文件时问题就会出现.据我所知,当我使用元数据文件编写镶木地板时,这些文件包含镶木地板文件的所有零件/分区的模式.对我而言,似乎也可以使用不同的模式.当我禁用写入元数据时,据说Spark从给定Parquet路径中的第一个文件推断整个模式,并假设它通过其他文件保持不变.
当一些应该是double类型的列只有给定日期的整数值时,从JSON读取它们(它们将这些数字作为整数,没有浮点数)使得Spark认为它是一个具有类型的列long.即使我可以在编写Parquet文件之前将这些列转换为double,但这仍然不好,因为架构可能会更改,可以添加新列,并且无法跟踪此列.
我看到有些人有同样的问题,但我还没有找到一个足够好的解决方案.
有什么最佳实践或解决方案?
我在PySpark(ML包)中训练了LogisticRegression模型,预测结果是PySpark DataFrame(cv_predictions)(参见[1]).该probability列(见[2])是一种vector类型(见[3]).
[1]
type(cv_predictions_prod)
pyspark.sql.dataframe.DataFrame
[2]
cv_predictions_prod.select('probability').show(10, False)
+----------------------------------------+
|probability |
+----------------------------------------+
|[0.31559134817066054,0.6844086518293395]|
|[0.8937864350711228,0.10621356492887715]|
|[0.8615878905395029,0.1384121094604972] |
|[0.9594427633777901,0.04055723662220989]|
|[0.5391547673698157,0.46084523263018434]|
|[0.2820729747752462,0.7179270252247538] |
|[0.7730465873083118,0.22695341269168817]|
|[0.6346585276598942,0.3653414723401058] |
|[0.6346585276598942,0.3653414723401058] |
|[0.637279255218404,0.362720744781596] |
+----------------------------------------+
only showing top 10 rows
[3]
cv_predictions_prod.printSchema()
root
...
|-- rawPrediction: vector (nullable = true)
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
如何创建解析vectorPySpark DataFrame,以便创建一个新列,只拉取每个probability向量的第一个元素?
这个问题类似于,但下面链接中的解决方案不起作用/我不清楚:
如何使用以下方法计算每组的累积总和DataFrame abstraction; 在PySpark?
使用示例数据集如下:
df = sqlContext.createDataFrame( [(1,2,"a"),(3,2,"a"),(1,3,"b"),(2,2,"a"),(2,3,"b")],
["time", "value", "class"] )
+----+-----+-----+
|time|value|class|
+----+-----+-----+
| 1| 2| a|
| 3| 2| a|
| 1| 3| b|
| 2| 2| a|
| 2| 3| b|
+----+-----+-----+
Run Code Online (Sandbox Code Playgroud)
我想value为class(有序)time变量添加每个分组的累积和列.
我有几百个文件夹,每个文件夹有成千上万的gzip文本文件,我试图将它们读入数据框中spark.read.csv().
在这些文件中,有些文件的长度为零,导致错误:
java.io.EOFException:输入流的意外结束
码:
df = spark.read.csv('s3n://my-bucket/folder*/logfiles*.log.gz',sep='\t',schema=schema)
Run Code Online (Sandbox Code Playgroud)
我已经尝试设置mode到DROPMALFORMED与阅读sc.textFile(),但没有运气.
处理空的或损坏的gzip文件的最佳方法是什么?
后:
val df = Seq((1, Vector(2, 3, 4)), (1, Vector(2, 3, 4))).toDF("Col1", "Col2")
Run Code Online (Sandbox Code Playgroud)
我在Apache Spark中有这个DataFrame:
+------+---------+
| Col1 | Col2 |
+------+---------+
| 1 |[2, 3, 4]|
| 1 |[2, 3, 4]|
+------+---------+
Run Code Online (Sandbox Code Playgroud)
我如何将其转换为:
+------+------+------+------+
| Col1 | Col2 | Col3 | Col4 |
+------+------+------+------+
| 1 | 2 | 3 | 4 |
| 1 | 2 | 3 | 4 |
+------+------+------+------+
Run Code Online (Sandbox Code Playgroud) 我有一个熊猫数据框my_df,并my_df.dtypes给我们:
ts int64
fieldA object
fieldB object
fieldC object
fieldD object
fieldE object
dtype: object
Run Code Online (Sandbox Code Playgroud)
然后我尝试通过以下操作将pandas数据帧my_df转换为spark数据框:
spark_my_df = sc.createDataFrame(my_df)
Run Code Online (Sandbox Code Playgroud)
但是,我收到以下错误:
ValueErrorTraceback (most recent call last)
<ipython-input-29-d4c9bb41bb1e> in <module>()
----> 1 spark_my_df = sc.createDataFrame(my_df)
2 spark_my_df.take(20)
/usr/local/spark-latest/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio)
520 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
521 else:
--> 522 rdd, schema = self._createFromLocal(map(prepare, data), schema)
523 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
524 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
/usr/local/spark-latest/python/pyspark/sql/session.py in _createFromLocal(self, data, schema)
384
385 …Run Code Online (Sandbox Code Playgroud) 我们这里有两个数据框:
预期的数据帧:
+------+---------+--------+----------+-------+--------+
|emp_id| emp_city|emp_name| emp_phone|emp_sal|emp_site|
+------+---------+--------+----------+-------+--------+
| 3| Chennai| rahman|9848022330| 45000|SanRamon|
| 1|Hyderabad| ram|9848022338| 50000| SF|
| 2|Hyderabad| robin|9848022339| 40000| LA|
| 4| sanjose| romin|9848022331| 45123|SanRamon|
+------+---------+--------+----------+-------+--------+
Run Code Online (Sandbox Code Playgroud)
和实际数据框:
+------+---------+--------+----------+-------+--------+
|emp_id| emp_city|emp_name| emp_phone|emp_sal|emp_site|
+------+---------+--------+----------+-------+--------+
| 3| Chennai| rahman|9848022330| 45000|SanRamon|
| 1|Hyderabad| ram|9848022338| 50000| SF|
| 2|Hyderabad| robin|9848022339| 40000| LA|
| 4| sanjose| romino|9848022331| 45123|SanRamon|
+------+---------+--------+----------+-------+--------+
Run Code Online (Sandbox Code Playgroud)
两个数据帧之间的区别现在是:
+------+--------+--------+----------+-------+--------+
|emp_id|emp_city|emp_name| emp_phone|emp_sal|emp_site|
+------+--------+--------+----------+-------+--------+
| 4| sanjose| romino|9848022331| 45123|SanRamon|
+------+--------+--------+----------+-------+--------+
Run Code Online (Sandbox Code Playgroud)
我们使用的是except函数df1.except(df2),但问题是,它返回的是不同的整行.我们想要的是查看该行中哪些列是不同的(在这种情况下,"romin"和"emp_name"中的"romino"不同).我们遇到了巨大的困难,任何帮助都会很棒.
我有以下动态创建的Spark数据帧:
val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)
val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)
val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)
val data = Seq(row1,row2,row3)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
Run Code Online (Sandbox Code Playgroud)
现在,我需要迭代每一行和sqlDF每列打印每一列,这是我的尝试:
sqlDF.foreach { row =>
row.foreach { col => println(col) }
}
Run Code Online (Sandbox Code Playgroud)
row是类型Row,但不可迭代,这就是为什么此代码抛出编译错误 …
spark-dataframe ×10
apache-spark ×8
pyspark ×4
scala ×3
bigdata ×2
python ×2
compare ×1
dataframe ×1
emr ×1
pandas ×1
parquet ×1
partitioning ×1
pyspark-sql ×1
python-2.7 ×1