TL; DR:在Spark Standalone集群中,客户端和集群部署模式之间有什么区别?如何设置应用程序运行的模式?
我们有一个带有三台机器的Spark Standalone集群,它们都使用Spark 1.6.1:
spark-submit
从Spark文档中,我读到:
(...)对于独立群集,Spark目前支持两种部署模式.在客户端模式下,驱动程序在与提交应用程序的客户端相同的进程中启动.但是,在集群模式下,驱动程序从集群内的一个工作进程启动,客户端进程在完成其提交应用程序的责任时立即退出,而无需等待应用程序完成.
但是,通过阅读本文,我并不真正了解实际差异,而且我不了解不同部署模式的优点和缺点.
另外,当我使用start-submit启动我的应用程序时,即使我将属性设置spark.submit.deployMode
为"cluster",我的上下文的Spark UI也显示以下条目:
所以我无法测试两种模式以查看实际差异.话虽这么说,我的问题是:
1)Spark Standalone 客户端部署模式和集群部署模式之间有哪些实际区别?使用每个人的专业和利益是什么?
2)如何选择我的应用程序将运行哪一个,使用spark-submit
?
我在Spark中有一个数据框,有很多列和我定义的udf.我希望返回相同的数据帧,除非转换了一列.此外,我的udf接受一个字符串并返回一个时间戳.是否有捷径可寻?我试过了
val test = myDF.select("my_column").rdd.map(r => getTimestamp(r))
Run Code Online (Sandbox Code Playgroud)
但这会返回一个RDD,只返回已转换的列.
我试图通过使用下面的代码过滤掉null或空字符串的行来创建新的数据框:
val df1 = df.filter(df("fieldA") != "").cache()
Run Code Online (Sandbox Code Playgroud)
然后我收到以下错误:
<console>:32: error: overloaded method value filter with alternatives:
(conditionExpr: String)org.apache.spark.sql.DataFrame <and>
(condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
cannot be applied to (Boolean)
val df1 = df.filter(df("fieldA") != "").cache()
^
Run Code Online (Sandbox Code Playgroud)
有谁知道我在这里错过了什么?谢谢!
我有一个场景,我想实现一个Cake Pattern的变体,但是向一个类添加隐式功能(一个Spark DataFrame).
所以,基本上,我希望能够运行如下代码:
trait Transformer {
this: ColumnAdder =>
def transform(input: DataFrame): DataFrame = {
input.addColumn("newCol")
}
}
val input = sqlContext.range(0, 5)
val transformer = new Transformer with StringColumnAdder
val output = transformer.transform(input)
output.show
Run Code Online (Sandbox Code Playgroud)
并找到如下结果:
+---+------+
| id|newCol|
+---+------+
| 0|newCol|
| 1|newCol|
| 2|newCol|
| 3|newCol|
| 4|newCol|
+---+------+
Run Code Online (Sandbox Code Playgroud)
我的第一个想法是仅在基本特征中定义隐式类:
trait ColumnAdder {
protected def _addColumn(df: DataFrame, colName: String): DataFrame
implicit class ColumnAdderRichDataFrame(df: DataFrame) {
def addColumn(colName: String): DataFrame = _addColumn(df, colName)
}
}
trait StringColumnAdder extends …
Run Code Online (Sandbox Code Playgroud) 如果我有一个简单的Int Scala集合,并且我定义了一个简单的方法 isPositive
,如果值大于0则返回true,那么我可以将方法传递给filter
集合的方法,如下例所示
def isPositive(i: Int): Boolean = i > 0
val aList = List(-3, -2, -1, 1, 2, 3)
val newList = aList.filter(isPositive)
> newList: List[Int] = List(1, 2, 3)
Run Code Online (Sandbox Code Playgroud)
因此,据我所知,编译器能够通过执行eta扩展自动将方法转换为函数实例,然后将此函数作为参数传递.
但是,如果我使用Spark数据集做同样的事情:
val aDataset = aList.toDS
val newDataset = aDataset.filter(isPositive)
> error
Run Code Online (Sandbox Code Playgroud)
它失败了众所周知的"缺少方法参数"错误.为了使它工作,我必须使用"_"显式地将方法转换为函数:
val newDataset = aDataset.filter(isPositive _)
> newDataset: org.apache.spark.sql.Dataset[Int] = [value: int]
Run Code Online (Sandbox Code Playgroud)
虽然map
它按预期工作:
val newDataset = aDataset.map(isPositive)
> newDataset: org.apache.spark.sql.Dataset[Boolean] = [value: boolean]
Run Code Online (Sandbox Code Playgroud)
调查签名,我看到Dataset过滤器的签名与List的过滤器非常相似:
// Dataset:
def filter(func: T => Boolean): …
Run Code Online (Sandbox Code Playgroud) 我有一个数据框,我想以每行包含列名称的方式进行修改.例如 :
FirstName LastName
Jhon Doe
David Lue
Run Code Online (Sandbox Code Playgroud)
创造了以下内容
(FirstName=Jhon,LastName=Doe)
(FirstName=David,LastName=Lue)
Run Code Online (Sandbox Code Playgroud)
我设法为df做了2列
val x = df.map { row => (names(0) + "=" +row(0) , names(1)+"="+rows(1)}
Run Code Online (Sandbox Code Playgroud)
但是我怎么能用for循环任意数量的列呢?
谢谢
我有一个类似于以下示例的数据集:
tmj_dc_mgmt, Washington, en, 483, 457, 256, ['hiring', 'BusinessMgmt', 'Washington', 'Job']
SRiku0728, ???, ja, 6705, 357, 273, ['None']
BesiktaSeyma_, Akyurt, tr, 12921, 1801, 283, ['None']
AnnaKFrick, Virginia, en, 5731, 682, 1120, ['Investment', 'PPP', 'Bogota', 'jobs']
Accprimary, Manchester, en, 1650, 268, 404, ['None']
Wandii_S, Johannesburg, en, 15510, 828, 398, ['None']
Run Code Online (Sandbox Code Playgroud)
方括号内的记录是主题标签(不包括"无").
我正在尝试使用Spark和Scala在数据集中找到前10个主题标签.
我达到了这个目的:
val file = sc.textFile("/data")
val tmp1 = file
.map(_.split(","))
.map( p=>p(6))
.map(_.replaceAll("\\[|\\]",""))
.map(_.replaceAll("'",""))
.filter(x => x != " None")
.map(word => (word, 1))
.reduceByKey(_ + _)
Run Code Online (Sandbox Code Playgroud)
我不知道如何对此进行排序并从中排名前10位,我是Scala和Spark的新手.
任何帮助,将不胜感激.