根据Spark数据集介绍:
正如我们期待着星火2.0,我们计划将数据集的一些令人兴奋的改进,具体包括:...自定义编码器 - 而我们对各种各样的类型目前自动生成编码器,我们想开了一个API,用于自定义对象.
并尝试将自定义类型存储为Dataset导致以下错误:
无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持
要么:
Java.lang.UnsupportedOperationException:找不到针对....的编码器
有没有现成的解决方法?
请注意,此问题仅作为社区Wiki答案的入口点存在.随意更新/改进问题和答案.
scala apache-spark apache-spark-dataset apache-spark-encoders
因此,在面向对象的世界中花费了多年的代码重用,设计模式和最佳实践总是被考虑在内,我发现自己在Spark世界中的代码组织和代码重用方面有些挣扎.
如果我尝试以可重用的方式编写代码,它几乎总是带来性能成本,我最终会将其重写为适合我的特定用例的最佳代码.这个常量"为这个特定用例编写最佳内容"也会影响代码组织,因为当"它们真的属于一个整体"时,将代码拆分成不同的对象或模块是困难的,因此我最终只得到很少的"上帝"对象包含长复杂变换链.事实上,我经常认为,如果我在面向对象世界工作时看到我现在正在写的大部分Spark代码,我会畏缩并将其视为"意大利面条代码".
我上网试图找到某种等同于面向对象世界的最佳实践,但没有太多运气.我可以找到一些函数式编程的"最佳实践",但Spark只增加了一个额外的层,因为性能是这里的一个主要因素.
所以我的问题是,你们中的任何人都有Spark专家发现了一些你可以推荐的编写Spark代码的最佳实践吗?
编辑
正如评论中所写,我实际上并没有希望有人就如何解决这个问题发表答案,而是我希望这个社区中的某个人遇到一些Martin Fowler类型,他曾在某处写过som文章或博客帖子关于如何解决Spark世界中代码组织的问题.
@DanielDarabos建议我举一个代码组织和性能相互矛盾的例子.虽然我发现我在日常工作中经常遇到这方面的问题,但我觉得把它归结为一个很好的最小例子有点困难;)但我会尝试.
在面向对象的世界里,我是单一责任原则的忠实粉丝,所以我要确保我的方法只对一件事负责.它使它们可重复使用并且易于测试.因此,如果我不得不计算列表中某些数字的总和(匹配某些标准)并且我必须计算相同数字的平均值,我肯定会创建两个方法 - 一个计算总和,一个计算平均值.像这样:
def main(implicit args: Array[String]): Unit = {
val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))
println("Summed weights for DK = " + summedWeights(list, "DK")
println("Averaged weights for DK = " + averagedWeights(list, "DK")
}
def summedWeights(list: List, country: String): Double = {
list.filter(_._1 == country).map(_._2).sum
}
def averagedWeights(list: List, country: String): Double = {
val filteredByCountry = list.filter(_._1 == country)
filteredByCountry.map(_._2).sum/ filteredByCountry.length
}
Run Code Online (Sandbox Code Playgroud)
我当然可以继续尊重Spark中的SRP:
def main(implicit …Run Code Online (Sandbox Code Playgroud) 这个问题的目标是记录:
在PySpark中使用JDBC连接读取和写入数据所需的步骤
JDBC源和已知解决方案可能存在的问题
通过小的更改,这些方法应该与其他支持的语言一起使用,包括Scala和R.
更改数据类型我可以使用类似的东西
l1 <- c("fac1","fac2","fac3")
l2 <- c("dbl1","dbl2","dbl3")
dat[,l1] <- lapply(dat[,l1], factor)
dat[,l2] <- lapply(dat[,l2], as.numeric)
Run Code Online (Sandbox Code Playgroud)
同 dplyr
dat <- dat %>% mutate(
fac1 = factor(fac1), fac2 = factor(fac2), fac3 = factor(fac3),
dbl1 = as.numeric(dbl1), dbl2 = as.numeric(dbl2), dbl3 = as.numeric(dbl3)
)
Run Code Online (Sandbox Code Playgroud)
在dplyr中有更优雅(更短)的方式吗?
克里斯托夫
我正在寻找一些更好的解释python中通过spark提供的聚合功能.
我的例子如下(使用Spark 1.2.0版本的pyspark)
sc.parallelize([1,2,3,4]).aggregate(
(0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)
输出:
(10, 4)
Run Code Online (Sandbox Code Playgroud)
我得到的预期结果(10,4)是1+2+3+44个元素的总和.如果我改变传递给聚合函数初始值(1,0)从(0,0) 我得到以下结果
sc.parallelize([1,2,3,4]).aggregate(
(1, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)
输出:
(19, 4)
Run Code Online (Sandbox Code Playgroud)
该值增加9.如果我将其更改为(2,0),则值将转到(28,4)依此类推.
有人可以向我解释这个值的计算方法吗?我预计价值会上升1而不是9,(11,4)我预计会看到(19,4).
我有一个数据集(user, product, review),并希望将其提供给mllib的ALS算法.
该算法需要用户和产品为数字,而我的是String用户名和字符串SKU.
现在,我获得了不同的用户和SKU,然后在Spark之外为他们分配数字ID.
我想知道是否有更好的方法来做到这一点.我想到的一种方法是编写一个自定义RDD,基本上枚举1到n,然后在两个RDD上调用zip.
如何在连接两个数据帧时提供更多列条件.例如,我想运行以下内容:
val Lead_all = Leads.join(Utm_Master,
Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
Run Code Online (Sandbox Code Playgroud)
我想只在这些列匹配时才加入.但是上面的语法无效,因为cols只需要一个字符串.那我怎么得到我想要的东西.
我原来的问题是为什么使用DecisionTreeModel.predict内部地图功能会引发异常?并且与如何使用MLlib在Spark上生成(原始标签,预测标签)的元组有关?
当我们使用Scala API时,推荐RDD[LabeledPoint]使用预测的方法DecisionTreeModel是简单地映射RDD:
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
Run Code Online (Sandbox Code Playgroud)
遗憾的是,PySpark中的类似方法效果不佳:
labelsAndPredictions = testData.map(
lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
Run Code Online (Sandbox Code Playgroud)
例外:您似乎尝试从广播变量,操作或转换引用SparkContext.SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063.
而不是官方文档推荐这样的东西:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
Run Code Online (Sandbox Code Playgroud)
那么这里发生了什么?此处没有广播变量,Scala API定义predict如下:
/**
* Predict values for a single data point using the model trained.
*
* @param features array representing …Run Code Online (Sandbox Code Playgroud) 我想用类似SQL的IN子句过滤Pyspark DataFrame ,如
sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')
Run Code Online (Sandbox Code Playgroud)
a元组在哪里(1, 2, 3).我收到此错误:
java.lang.RuntimeException:[1.67]失败:``('''',但是找到了标识符
这基本上是说它期待类似'(1,2,3)'而不是a.问题是我不能手动写入a中的值,因为它是从另一个作业中提取的.
在这种情况下我该如何过滤?
Spark 1.4.1
我遇到一种情况,按数据框进行分组,然后对'count'列进行计数和过滤会引发下面的异常
import sqlContext.implicits._
import org.apache.spark.sql._
case class Paf(x:Int)
val myData = Seq(Paf(2), Paf(1), Paf(2))
val df = sc.parallelize(myData, 2).toDF()
Run Code Online (Sandbox Code Playgroud)
然后分组和过滤:
df.groupBy("x").count()
.filter("count >= 2")
.show()
Run Code Online (Sandbox Code Playgroud)
引发异常:
java.lang.RuntimeException: [1.7] failure: ``('' expected but `>=' found count >= 2
Run Code Online (Sandbox Code Playgroud)
解:
重命名列会使问题消失(因为我怀疑与插值'count'函数没有冲突'
df.groupBy("x").count()
.withColumnRenamed("count", "n")
.filter("n >= 2")
.show()
Run Code Online (Sandbox Code Playgroud)
那么,这是一种期望的行为,一个错误还是一种规范的方式?
谢谢,亚历克斯