我更喜欢Python而不是Scala.但是,由于Spark本身是用Scala编写的,因此我希望我的代码在Scala中的运行速度比Python版本快,原因很明显.
有了这个假设,我想学习和编写一些非常常见的预处理代码的Scala版本,用于1 GB的数据.数据来自Kaggle的SpringLeaf比赛.只是为了概述数据(它包含1936个维度和145232行).数据由各种类型组成,例如int,float,string,boolean.我使用8个核心中的6个用于Spark处理; 这就是我使用的原因minPartitions=6,每个核心都有一些东西需要处理.
Scala代码
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = { …Run Code Online (Sandbox Code Playgroud) 根据Spark数据集介绍:
正如我们期待着星火2.0,我们计划将数据集的一些令人兴奋的改进,具体包括:...自定义编码器 - 而我们对各种各样的类型目前自动生成编码器,我们想开了一个API,用于自定义对象.
并尝试将自定义类型存储为Dataset导致以下错误:
无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持
要么:
Java.lang.UnsupportedOperationException:找不到针对....的编码器
有没有现成的解决方法?
请注意,此问题仅作为社区Wiki答案的入口点存在.随意更新/改进问题和答案.
scala apache-spark apache-spark-dataset apache-spark-encoders
请考虑以下代码段(假设spark已设置为某些代码段SparkSession):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
Run Code Online (Sandbox Code Playgroud)
请注意,temperature字段是浮动列表.我想将这些浮点数列表转换为MLlib类型Vector,我希望使用基本DataFrameAPI 表示这种转换,而不是通过RDD表达(这是低效的,因为它将所有数据从JVM发送到Python,处理在Python中完成,我们没有得到Spark的Catalyst优化器,yada yada的好处.我该怎么做呢?特别:
这就是我期望的"正确"解决方案.我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换.作为一个上下文,让我提醒您将其转换为另一种类型的正常方法:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Run Code Online (Sandbox Code Playgroud)
现在例如df_with_strings.collect()[0]["temperatures"][1]是'-7.0'.但是如果我施放到ml Vector那么事情就不那么顺利了:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
Run Code Online (Sandbox Code Playgroud)
这给出了一个错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type …Run Code Online (Sandbox Code Playgroud) python apache-spark apache-spark-sql pyspark apache-spark-ml
我仍在努力了解最近推出的Spark数据集的全部功能.
是否有何时使用RDD以及何时使用数据集的最佳实践?
在他们的公告中, Databricks解释说,通过使用数据集,可以实现运行时和内存的惊人减少.仍然声称数据集被设计为"与现有的RDD API一起工作".
这只是向下兼容性的参考,还是有人宁愿在数据集上使用RDD?
我正在尝试了解DataFrame列类型.当然,DataFrame不是物化对象,它只是Spark的一组指令,将来转换为代码.但我想象这个类型列表表示在执行操作时可能在JVM内部实现的对象类型.
import pyspark
import pyspark.sql.types as T
import pyspark.sql.functions as F
data = [0, 3, 0, 4]
d = {}
d['DenseVector'] = pyspark.ml.linalg.DenseVector(data)
d['old_DenseVector'] = pyspark.mllib.linalg.DenseVector(data)
d['SparseVector'] = pyspark.ml.linalg.SparseVector(4, dict(enumerate(data)))
d['old_SparseVector'] = pyspark.mllib.linalg.SparseVector(4, dict(enumerate(data)))
df = spark.createDataFrame([d])
df.printSchema()
Run Code Online (Sandbox Code Playgroud)
四个向量值的列在printSchema()(或schema)中看起来相同:
root
|-- DenseVector: vector (nullable = true)
|-- SparseVector: vector (nullable = true)
|-- old_DenseVector: vector (nullable = true)
|-- old_SparseVector: vector (nullable = true)
Run Code Online (Sandbox Code Playgroud)
但是当我逐行检索它们时,它们会变得不同:
> for x in df.first().asDict().items():
print(x[0], type(x[1]))
(2) Spark Jobs
old_SparseVector …Run Code Online (Sandbox Code Playgroud) dataframe apache-spark pyspark apache-spark-ml apache-spark-mllib
我有一些数据存储为镶木地板文件和与数据模式匹配的案例类。Spark 可以很好地处理常规产品类型,所以如果我有
case class A(s:String, i:Int)
Run Code Online (Sandbox Code Playgroud)
我可以轻松做到
spark.read.parquet(file).as[A]
Run Code Online (Sandbox Code Playgroud)
但据我了解,Spark 不处理析取类型,因此当我的 parquet 中有枚举(之前编码为整数)和 scala 表示形式时,如
sealed trait E
case object A extends E
case object B extends E
Run Code Online (Sandbox Code Playgroud)
我不能做
spark.read.parquet(file).as[E]
// java.lang.UnsupportedOperationException: No Encoder found for E
Run Code Online (Sandbox Code Playgroud)
到目前为止是有道理的,但后来,可能太天真了,我尝试
implicit val eEncoder = new org.apache.spark.sql.Encoder[E] {
def clsTag = ClassTag(classOf[E])
def schema = StructType(StructField("e", IntegerType, nullable = false)::Nil)
}
Run Code Online (Sandbox Code Playgroud)
我仍然得到相同的“没有找到 E 的编码器”:(
我现在的问题是,为什么范围内隐式缺失?(或者不被识别为编码器[E]),即使它被识别,这样的接口如何让我真正解码数据?我仍然需要将值映射到正确的案例对象。
我确实读过一个相关的答案,上面写着“TL;DR 目前没有好的解决方案,并且考虑到 Spark SQL/数据集的实现,在可预见的将来不太可能有一个解决方案。” 但我很难理解为什么自定义编码器无法做到这一点。
我正在尝试实现自定义UDT并能够从Spark SQL引用它(如Spark SQL白皮书的第4.4.2节中所述)。
真正的例子是使用Cap'n Proto或类似方法,使自定义UDT由堆外数据结构提供支持。
对于这篇文章,我做了一个人为的例子。我知道我可以只使用Scala案例类,而不必做任何工作,但这不是我的目标。
例如,我有一个Person包含多个属性,并且希望能够SELECT person.first_name FROM person。我遇到了错误Can't extract value from person#1,但不确定为什么。
这是完整的源代码(也可以从https://github.com/andygrove/spark-sql-udt获取)。
package com.theotherandygrove
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object Example {
def main(arg: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Example")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val schema = StructType(List(
StructField("person_id", DataTypes.IntegerType, true),
StructField("person", new MockPersonUDT, true)))
// load initial RDD
val rdd = sc.parallelize(List(
MockPersonImpl(1),
MockPersonImpl(2) …Run Code Online (Sandbox Code Playgroud) 我正在尝试将UDF与输入类型Array of struct一起使用.我有以下数据结构,这只是更大结构的相关部分
|--investments: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- funding_round: struct (nullable = true)
| | | |-- company: struct (nullable = true)
| | | | |-- name: string (nullable = true)
| | | | |-- permalink: string (nullable = true)
| | | |-- funded_day: long (nullable = true)
| | | |-- funded_month: long (nullable = true)
| | | |-- funded_year: long (nullable = true)
| …Run Code Online (Sandbox Code Playgroud) user-defined-functions dataframe apache-spark apache-spark-sql
我正在与其他用户共享的集群上使用 Spark。所以仅仅根据运行时间来判断我的哪一个代码运行效率更高是不可靠的。因为当我运行更高效的代码时,其他人可能会运行大量数据,并使我的代码执行更长时间。
那么我可以在这里问两个问题吗:
我正在使用joinfunction 来 join 2RDDs并且我尝试groupByKey()在 using 之前使用join,如下所示:
rdd1.groupByKey().join(rdd2)
Run Code Online (Sandbox Code Playgroud)
似乎花了更长的时间,但是我记得当我使用 Hadoop Hive 时,group by 让我的查询运行得更快。由于 Spark 使用惰性求值,我想知道groupByKeybefore是否join会让事情变得更快
我注意到Spark有一个SQL模块,到目前为止我真的没有时间尝试它,但是我可以问一下SQL模块和RDD SQL类似功能之间有什么区别吗?