小编Dan*_*ula的帖子

Apache Spark:客户端和集群部署模式之间的差异

TL; DR:在Spark Standalone集群中,客户端和集群部署模式之间有什么区别?如何设置应用程序运行的模式?


我们有一个带有三台机器的Spark Standalone集群,它们都使用Spark 1.6.1:

  • 主机,也是我们运行应用程序的地方 spark-submit
  • 2台相同的工人机器

Spark文档中,我读到:

(...)对于独立群集,Spark目前支持两种部署模式.在客户端模式下,驱动程序在与提交应用程序的客户端相同的进程中启动.但是,在集群模式下,驱动程序从集群内的一个工作进程启动,客户端进程在完成其提交应用程序的责任时立即退出,而无需等待应用程序完成.

但是,通过阅读本文,我并不真正了解实际差异,而且我不了解不同部署模式的优点和缺点.

另外,当我使用start-submit启动我的应用程序时,即使我将属性设置spark.submit.deployMode为"cluster",我的上下文的Spark UI也显示以下条目:

上下文UI

所以我无法测试两种模式以查看实际差异.话虽这么说,我的问题是:

1)Spark Standalone 客户端部署模式和集群部署模式之间有哪些实际区别?使用每个人的专业和利益是什么?

2)如何选择我的应用程序将运行哪一个,使用spark-submit

apache-spark apache-spark-standalone

36
推荐指数
2
解决办法
3万
查看次数

Spark Scala:如何转换DF中的列

我在Spark中有一个数据框,有很多列和我定义的udf.我希望返回相同的数据帧,除非转换了一列.此外,我的udf接受一个字符串并返回一个时间戳.是否有捷径可寻?我试过了

val test = myDF.select("my_column").rdd.map(r => getTimestamp(r)) 
Run Code Online (Sandbox Code Playgroud)

但这会返回一个RDD,只返回已转换的列.

scala apache-spark

25
推荐指数
1
解决办法
3万
查看次数

DataFrame错误:"带有替代项的重载方法值过滤器"

我试图通过使用下面的代码过滤掉null或空字符串的行来创建新的数据框:

val df1 = df.filter(df("fieldA") != "").cache()
Run Code Online (Sandbox Code Playgroud)

然后我收到以下错误:

 <console>:32: error: overloaded method value filter with alternatives:
      (conditionExpr: String)org.apache.spark.sql.DataFrame <and>
      (condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
     cannot be applied to (Boolean)
                  val df1 = df.filter(df("fieldA") != "").cache()
                                 ^
Run Code Online (Sandbox Code Playgroud)

有谁知道我在这里错过了什么?谢谢!

scala dataframe apache-spark

17
推荐指数
1
解决办法
2万
查看次数

使用隐式功能实现Cake Pattern

我有一个场景,我想实现一个Cake Pattern的变体,但是向一个类添加隐式功能(一个Spark DataFrame).

所以,基本上,我希望能够运行如下代码:

trait Transformer {
  this: ColumnAdder =>

  def transform(input: DataFrame): DataFrame = {
    input.addColumn("newCol")
  }
}

val input = sqlContext.range(0, 5)
val transformer = new Transformer with StringColumnAdder
val output = transformer.transform(input)
output.show
Run Code Online (Sandbox Code Playgroud)

并找到如下结果:

+---+------+
| id|newCol|
+---+------+
|  0|newCol|
|  1|newCol|
|  2|newCol|
|  3|newCol|
|  4|newCol|
+---+------+
Run Code Online (Sandbox Code Playgroud)

我的第一个想法是仅在基本特征中定义隐式类:

trait ColumnAdder {
  protected def _addColumn(df: DataFrame, colName: String): DataFrame

  implicit class ColumnAdderRichDataFrame(df: DataFrame) {
    def addColumn(colName: String): DataFrame = _addColumn(df, colName)
  }
}

trait StringColumnAdder extends …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

10
推荐指数
1
解决办法
330
查看次数

数据集过滤器:eta扩展不会自动完成

如果我有一个简单的Int Scala集合,并且我定义了一个简单的方法 isPositive,如果值大于0则返回true,那么我可以将方法传递给filter集合的方法,如下例所示

def isPositive(i: Int): Boolean = i > 0

val aList = List(-3, -2, -1, 1, 2, 3)
val newList = aList.filter(isPositive)

> newList: List[Int] = List(1, 2, 3)
Run Code Online (Sandbox Code Playgroud)

因此,据我所知,编译器能够通过执行eta扩展自动将方法转换为函数实例,然后将此函数作为参数传递.

但是,如果我使用Spark数据集做同样的事情:

val aDataset = aList.toDS
val newDataset = aDataset.filter(isPositive)

> error
Run Code Online (Sandbox Code Playgroud)

它失败了众所周知的"缺少方法参数"错误.为了使它工作,我必须使用"_"显式地将方法转换为函数:

val newDataset = aDataset.filter(isPositive _)

> newDataset: org.apache.spark.sql.Dataset[Int] = [value: int]
Run Code Online (Sandbox Code Playgroud)

虽然map它按预期工作:

val newDataset = aDataset.map(isPositive)

> newDataset: org.apache.spark.sql.Dataset[Boolean] = [value: boolean]
Run Code Online (Sandbox Code Playgroud)

调查签名,我看到Dataset过滤器的签名与List的过滤器非常相似:

// Dataset:
def filter(func: T => Boolean): …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

5
推荐指数
0
解决办法
102
查看次数

spark数据帧将列名称连接到值

我有一个数据框,我想以每行包含列名称的方式进行修改.例如 :

FirstName LastName
Jhon       Doe
David      Lue
Run Code Online (Sandbox Code Playgroud)

创造了以下内容

(FirstName=Jhon,LastName=Doe)
(FirstName=David,LastName=Lue)
Run Code Online (Sandbox Code Playgroud)

我设法为df做了2列

val x = df.map { row => (names(0) + "=" +row(0) , names(1)+"="+rows(1)}
Run Code Online (Sandbox Code Playgroud)

但是我怎么能用for循环任意数量的列呢?

谢谢

dataframe apache-spark

2
推荐指数
1
解决办法
1589
查看次数

计算元组中单词的出现次数

我有一个类似于以下示例的数据集:

tmj_dc_mgmt, Washington, en, 483, 457, 256, ['hiring', 'BusinessMgmt', 'Washington', 'Job']
SRiku0728, ???, ja, 6705, 357, 273, ['None']
BesiktaSeyma_, Akyurt, tr, 12921, 1801, 283, ['None']
AnnaKFrick, Virginia, en, 5731, 682, 1120, ['Investment', 'PPP', 'Bogota', 'jobs']
Accprimary, Manchester, en, 1650, 268, 404, ['None']
Wandii_S, Johannesburg, en, 15510, 828, 398, ['None']
Run Code Online (Sandbox Code Playgroud)

方括号内的记录是主题标签(不包括"无").

我正在尝试使用Spark和Scala在数据集中找到前10个主题标签.

我达到了这个目的:

val file = sc.textFile("/data")
val tmp1 = file
  .map(_.split(","))
  .map( p=>p(6))
  .map(_.replaceAll("\\[|\\]",""))
  .map(_.replaceAll("'",""))
  .filter(x => x != " None")
  .map(word => (word, 1))
  .reduceByKey(_ + _)
Run Code Online (Sandbox Code Playgroud)

我不知道如何对此进行排序并从中排名前10位,我是Scala和Spark的新手.

任何帮助,将不胜感激.

scala apache-spark

0
推荐指数
1
解决办法
177
查看次数