我开始使用Spark DataFrames,我需要能够透过数据来创建多列的1列中的多列.在Scalding中有内置的功能,我相信Python中的Pandas,但我找不到任何新的Spark Dataframe.
我假设我可以编写某种类型的自定义函数,但是我甚至不确定如何启动,特别是因为我是Spark的新手.我有人知道如何使用内置功能或如何在Scala中编写内容的建议,非常感谢.
我正在 Scala 中完成一些 Sapark 项目,我看到所有对象都在扩展Serializable.
就像是 :
object someName extends Serializable {
//some code
}
Run Code Online (Sandbox Code Playgroud)
我了解序列化通常用于存储或通信数据结构,以便数据可以轻松地以原始形式从序列化形式加载到内存中。然而,在这种情况下,object它更像是一个 Java 类。那么,扩展的意义或优势是Serializable什么?你什么时候做这个?有必要一直这样做吗?
在执行某些函数后,为什么nullable = true?df中仍然没有纳米值.
val myDf = Seq((2,"A"),(2,"B"),(1,"C"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Int"))
myDf.withColumn("foo_2", when($"foo" === 2 , 1).otherwise(0)).select("foo", "foo_2").show
Run Code Online (Sandbox Code Playgroud)
当nullable = true被调用时,nullable对于两列都是false.
val foo: (Int => String) = (t: Int) => {
fooMap.get(t) match {
case Some(tt) => tt
case None => "notFound"
}
}
val fooMap = Map(
1 -> "small",
2 -> "big"
)
val fooUDF = udf(foo)
myDf
.withColumn("foo", fooUDF(col("foo")))
.withColumn("foo_2", when($"foo" === 2 , 1).otherwise(0)).select("foo", "foo_2")
.select("foo", "foo_2")
.printSchema
Run Code Online (Sandbox Code Playgroud)
但是现在,对于至少一个之前为假的列,可以为空.怎么解释这个?
我拉了 docker 镜像并执行下面的命令来运行镜像。
docker run -it bitnami/spark:latest /bin/bash
spark-shell --packages="org.elasticsearch:elasticsearch-spark-20_2.11:7.5.0"
我收到如下消息
Ivy Default Cache set to: /opt/bitnami/spark/.ivy2/cache
The jars for the packages stored in: /opt/bitnami/spark/.ivy2/jars
:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.elasticsearch#elasticsearch-spark-20_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c785f3e6-7c78-469f-ab46-451f8be61a4c;1.0
confs: [default]
Exception in thread "main" java.io.FileNotFoundException: /opt/bitnami/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-c785f3e6-7c78-469f-ab46-451f8be61a4c-1.0.xml (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at org.apache.ivy.plugins.parser.xml.XmlModuleDescriptorWriter.write(XmlModuleDescriptorWriter.java:70)
at org.apache.ivy.plugins.parser.xml.XmlModuleDescriptorWriter.write(XmlModuleDescriptorWriter.java:62)
at org.apache.ivy.core.module.descriptor.DefaultModuleDescriptor.toIvyFile(DefaultModuleDescriptor.java:563)
at org.apache.ivy.core.cache.DefaultResolutionCacheManager.saveResolvedModuleDescriptor(DefaultResolutionCacheManager.java:176)
at org.apache.ivy.core.resolve.ResolveEngine.resolve(ResolveEngine.java:245)
at org.apache.ivy.Ivy.resolve(Ivy.java:523)
at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1300)
at org.apache.spark.deploy.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:54) …Run Code Online (Sandbox Code Playgroud) 当我posexplode()在 Spark SQL 中使用函数时,以下语句生成“pos”和“col”作为默认名称
scala> spark.sql(""" with t1(select to_date('2019-01-01') first_day) select first_day,date_sub(add_months(first_day,1),1) last_day, posexplode(array(5,6,7)) from t1 """).show(false)
+----------+----------+---+---+
|first_day |last_day |pos|col|
+----------+----------+---+---+
|2019-01-01|2019-01-31|0 |5 |
|2019-01-01|2019-01-31|1 |6 |
|2019-01-01|2019-01-31|2 |7 |
+----------+----------+---+---+
Run Code Online (Sandbox Code Playgroud)
在 spark.sql 中覆盖这些默认名称的语法是什么?在数据帧中,这可以通过给出df.explode(select 'arr.as(Seq("arr_val","arr_pos")))
scala> val arr= Array(5,6,7)
arr: Array[Int] = Array(5, 6, 7)
scala> Seq(("dummy")).toDF("x").select(posexplode(lit(arr)).as(Seq("arr_val","arr_pos"))).show(false)
+-------+-------+
|arr_val|arr_pos|
+-------+-------+
|0 |5 |
|1 |6 |
|2 |7 |
+-------+-------+
Run Code Online (Sandbox Code Playgroud)
如何在 SQL 中得到它?我试过
spark.sql(""" with t1(select to_date('2011-01-01') first_day) select first_day,date_sub(add_months(first_day,1),1) last_day, posexplode(array(5,6,7)) as(Seq('p','c')) from t1 """).show(false) …Run Code Online (Sandbox Code Playgroud) 我正在一个Spark ML管道上工作,在该管道上我们会在较大的数据集上看到OOM错误。在训练之前我们正在使用cache(); 我换了一下checkpoint(),我们的内存需求大大下降了。然而,在文档进行RDD的checkpoint(),它说:
强烈建议将该RDD保留在内存中,否则将其保存在文件中将需要重新计算。
DataSet我正在使用的检查点未提供相同的指导。无论如何,遵循以上建议,我发现cache()单独使用内存的需求实际上有所增加。
我的期望是当我们这样做时
...
ds.cache()
ds.checkpoint()
...
Run Code Online (Sandbox Code Playgroud)
对检查点的调用会强制对进行评估,该评估会DataSet在被检查点之前同时缓存。之后,任何对的引用都ds将引用缓存的分区,并且如果需要更多的内存并且将分区撤离,将使用检查点分区,而不是重新评估它们。这是真的吗,还是在幕后发生了什么变化?理想情况下,如果可能的话,我希望将DataSet保留在内存中,但是从内存的角度来看,使用缓存和检查点方法似乎没有任何好处。
我有一个包含一些分类字符串列的数据集,我想用双精度类型表示它们。我使用 StringIndexer 进行此转换并且它有效,但是当我在另一个具有 NULL 值的数据集中尝试它时,它给出了java.lang.NullPointerException错误并且不起作用。
为了更好地理解这里是我的代码:
for(col <- cols){
out_name = col ++ "_"
var indexer = new StringIndexer().setInputCol(col).setOutputCol(out_name)
var indexed = indexer.fit(df).transform(df)
df = (indexed.withColumn(col, indexed(out_name))).drop(out_name)
}
Run Code Online (Sandbox Code Playgroud)
那么如何使用 StringIndexer 解决这个 NULL 数据问题呢?
或者是否有更好的解决方案将具有 NULL 值的字符串类型的分类数据转换为 double?
我想声明一个返回 2 1D 数组或 1 2D 数组的 udf(两者的示例都很好)。我知道这适用于 1D:
@udf("array<int>")
Run Code Online (Sandbox Code Playgroud)
但是,我尝试了许多变体,例如以下没有运气:
@udf("array<int>,array<int>")
@udf("array<int>","array<int>")
@udf("array<int,int>")
etc.
Run Code Online (Sandbox Code Playgroud) user-defined-functions apache-spark apache-spark-sql pyspark