小编zer*_*323的帖子

如何在Dataset中存储自定义对象?

根据Spark数据集介绍:

正如我们期待着星火2.0,我们计划将数据集的一些令人兴奋的改进,具体包括:...自定义编码器 - 而我们对各种各样的类型目前自动生成编码器,我们想开了一个API,用于自定义对象.

并尝试将自定义类型存储为Dataset导致以下错误:

无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持

要么:

Java.lang.UnsupportedOperationException:找不到针对....的编码器

有没有现成的解决方法?


请注意,此问题仅作为社区Wiki答案的入口点存在.随意更新/改进问题和答案.

scala apache-spark apache-spark-dataset apache-spark-encoders

133
推荐指数
4
解决办法
6万
查看次数

Spark代码组织和最佳实践

因此,在面向对象的世界中花费了多年的代码重用,设计模式和最佳实践总是被考虑在内,我发现自己在Spark世界中的代码组织和代码重用方面有些挣扎.

如果我尝试以可重用的方式编写代码,它几乎总是带来性能成本,我最终会将其重写为适合我的特定用例的最佳代码.这个常量"为这个特定用例编写最佳内容"也会影响代码组织,因为当"它们真的属于一个整体"时,将代码拆分成不同的对象或模块是困难的,因此我最终只得到很少的"上帝"对象包含长复杂变换链.事实上,我经常认为,如果我在面向对象世界工作时看到我现在正在写的大部分Spark代码,我会畏缩并将其视为"意大利面条代码".

我上网试图找到某种等同于面向对象世界的最佳实践,但没有太多运气.我可以找到一些函数式编程的"最佳实践",但Spark只增加了一个额外的层,因为性能是这里的一个主要因素.

所以我的问题是,你们中的任何人都有Spark专家发现了一些你可以推荐的编写Spark代码的最佳实践吗?

编辑

正如评论中所写,我实际上并没有希望有人就如何解决这个问题发表答案,而是我希望这个社区中的某个人遇到一些Martin Fowler类型,他曾在某处写过som文章或博客帖子关于如何解决Spark世界中代码组织的问题.

@DanielDarabos建议我举一个代码组织和性能相互矛盾的例子.虽然我发现我在日常工作中经常遇到这方面的问题,但我觉得把它归结为一个很好的最小例子有点困难;)但我会尝试.

在面向对象的世界里,我是单一责任原则的忠实粉丝,所以我要确保我的方法只对一件事负责.它使它们可重复使用并且易于测试.因此,如果我不得不计算列表中某些数字的总和(匹配某些标准)并且我必须计算相同数字的平均值,我肯定会创建两个方法 - 一个计算总和,一个计算平均值.像这样:

def main(implicit args: Array[String]): Unit = {
  val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))

  println("Summed weights for DK = " + summedWeights(list, "DK")
  println("Averaged weights for DK = " + averagedWeights(list, "DK")
}

def summedWeights(list: List, country: String): Double = {
  list.filter(_._1 == country).map(_._2).sum
}

def averagedWeights(list: List, country: String): Double = {
  val filteredByCountry = list.filter(_._1 == country) 
  filteredByCountry.map(_._2).sum/ filteredByCountry.length
}
Run Code Online (Sandbox Code Playgroud)

我当然可以继续尊重Spark中的SRP:

def main(implicit …
Run Code Online (Sandbox Code Playgroud)

functional-programming code-organization apache-spark

65
推荐指数
1
解决办法
6675
查看次数

如何使用JDBC源在(Py)Spark中写入和读取数据?

这个问题的目标是记录:

  • 在PySpark中使用JDBC连接读取和写入数据所需的步骤

  • JDBC源和已知解决方案可能存在的问题

通过小的更改,这些方法应该与其他支持的语言一起使用,包括Scala和R.

python scala apache-spark apache-spark-sql pyspark

57
推荐指数
1
解决办法
6万
查看次数

dplyr更改了许多数据类型

更改数据类型我可以使用类似的东西

l1 <- c("fac1","fac2","fac3")
l2 <- c("dbl1","dbl2","dbl3")
dat[,l1] <- lapply(dat[,l1], factor)
dat[,l2] <- lapply(dat[,l2], as.numeric)
Run Code Online (Sandbox Code Playgroud)

dplyr

dat <- dat %>% mutate(
    fac1 = factor(fac1), fac2 = factor(fac2), fac3 = factor(fac3),
    dbl1 = as.numeric(dbl1), dbl2 = as.numeric(dbl2), dbl3 = as.numeric(dbl3)
)
Run Code Online (Sandbox Code Playgroud)

在dplyr中有更优雅(更短)的方式吗?

克里斯托夫

r dataframe dplyr

52
推荐指数
5
解决办法
8万
查看次数

解释Spark中的聚合功能

我正在寻找一些更好的解释python中通过spark提供的聚合功能.

我的例子如下(使用Spark 1.2.0版本的pyspark)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)

输出:

(10, 4)
Run Code Online (Sandbox Code Playgroud)

我得到的预期结果(10,4)1+2+3+44个元素的总和.如果我改变传递给聚合函数初始值(1,0)(0,0) 我得到以下结果

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)

输出:

(19, 4)
Run Code Online (Sandbox Code Playgroud)

该值增加9.如果我将其更改为(2,0),则值将转到(28,4)依此类推.

有人可以向我解释这个值的计算方法吗?我预计价值会上升1而不是9,(11,4)我预计会看到(19,4).

python lambda aggregate apache-spark rdd

50
推荐指数
3
解决办法
3万
查看次数

如何为Spark RDD中的元素分配唯一的连续数字

我有一个数据集(user, product, review),并希望将其提供给mllib的ALS算法.

该算法需要用户和产品为数字,而我的是String用户名和字符串SKU.

现在,我获得了不同的用户和SKU,然后在Spark之外为他们分配数字ID.

我想知道是否有更好的方法来做到这一点.我想到的一种方法是编写一个自定义RDD,基本上枚举1到n,然后在两个RDD上调用zip.

apache-spark apache-spark-mllib

46
推荐指数
3
解决办法
2万
查看次数

Spark为数据帧连接指定多个列条件

如何在连接两个数据帧时提供更多列条件.例如,我想运行以下内容:

val Lead_all = Leads.join(Utm_Master,  
    Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
    Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
Run Code Online (Sandbox Code Playgroud)

我想只在这些列匹配时才加入.但是上面的语法无效,因为cols只需要一个字符串.那我怎么得到我想要的东西.

apache-spark rdd apache-spark-sql

44
推荐指数
6
解决办法
8万
查看次数

从任务中调用Java/Scala函数

背景

我原来的问题是为什么使用DecisionTreeModel.predict内部地图功能会引发异常?并且与如何使用MLlib在Spark上生成(原始标签,预测标签)的元组有关?

当我们使用Scala API时,推荐RDD[LabeledPoint]使用预测的方法DecisionTreeModel是简单地映射RDD:

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
Run Code Online (Sandbox Code Playgroud)

遗憾的是,PySpark中的类似方法效果不佳:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
Run Code Online (Sandbox Code Playgroud)

例外:您似乎尝试从广播变量,操作或转换引用SparkContext.SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063.

而不是官方文档推荐这样的东西:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
Run Code Online (Sandbox Code Playgroud)

那么这里发生了什么?此处没有广播变量,Scala API定义predict如下:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing …
Run Code Online (Sandbox Code Playgroud)

python scala apache-spark pyspark apache-spark-mllib

37
推荐指数
1
解决办法
9913
查看次数

使用类似SQL的IN子句过滤Pyspark DataFrame

我想用类似SQL的IN子句过滤Pyspark DataFrame ,如

sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')
Run Code Online (Sandbox Code Playgroud)

a元组在哪里(1, 2, 3).我收到此错误:

java.lang.RuntimeException:[1.67]失败:``('''',但是找到了标识符

这基本上是说它期待类似'(1,2,3)'而不是a.问题是我不能手动写入a中的值,因为它是从另一个作业中提取的.

在这种情况下我该如何过滤?

python sql dataframe apache-spark pyspark

37
推荐指数
4
解决办法
5万
查看次数

dataframe:如何groupBy/count然后过滤Scala中的count

Spark 1.4.1

我遇到一种情况,按数据框进行分组,然后对'count'列进行计数和过滤会引发下面的异常

import sqlContext.implicits._
import org.apache.spark.sql._

case class Paf(x:Int)
val myData = Seq(Paf(2), Paf(1), Paf(2))
val df = sc.parallelize(myData, 2).toDF()
Run Code Online (Sandbox Code Playgroud)

然后分组和过滤:

df.groupBy("x").count()
  .filter("count >= 2")
  .show()
Run Code Online (Sandbox Code Playgroud)

引发异常:

java.lang.RuntimeException: [1.7] failure: ``('' expected but `>=' found count >= 2
Run Code Online (Sandbox Code Playgroud)

解:

重命名列会使问题消失(因为我怀疑与插值'count'函数没有冲突'

df.groupBy("x").count()
  .withColumnRenamed("count", "n")
  .filter("n >= 2")
  .show()
Run Code Online (Sandbox Code Playgroud)

那么,这是一种期望的行为,一个错误还是一种规范的方式?

谢谢,亚历克斯

scala apache-spark apache-spark-sql

35
推荐指数
3
解决办法
9万
查看次数