小编use*_*362的帖子

如何转动DataFrame?

我开始使用Spark DataFrames,我需要能够透过数据来创建多列的1列中的多列.在Scalding中有内置的功能,我相信Python中的Pandas,但我找不到任何新的Spark Dataframe.

我假设我可以编写某种类型的自定义函数,但是我甚至不确定如何启动,特别是因为我是Spark的新手.我有人知道如何使用内置功能或如何在Scala中编写内容的建议,非常感谢.

pivot scala dataframe apache-spark apache-spark-sql

52
推荐指数
5
解决办法
4万
查看次数

为什么要在 Scala 中扩展 Serializable?

我正在 Scala 中完成一些 Sapark 项目,我看到所有对象都在扩展Serializable.

就像是 :

object someName extends Serializable {
//some code
}
Run Code Online (Sandbox Code Playgroud)

我了解序列化通常用于存储或通信数据结构,以便数据可以轻松地以原始形式从序列化形式加载到内存中。然而,在这种情况下,object它更像是一个 Java 类。那么,扩展的意义或优势是Serializable什么?你什么时候做这个?有必要一直这样做吗?

java serialization scala deserialization

9
推荐指数
2
解决办法
5736
查看次数

火花为什么列变为可空的真实

在执行某些函数后,为什么nullable = true?df中仍然没有纳米值.

val myDf = Seq((2,"A"),(2,"B"),(1,"C"))
         .toDF("foo","bar")
         .withColumn("foo", 'foo.cast("Int"))

myDf.withColumn("foo_2", when($"foo" === 2 , 1).otherwise(0)).select("foo", "foo_2").show
Run Code Online (Sandbox Code Playgroud)

nullable = true被调用时,nullable对于两列都是false.

val foo: (Int => String) = (t: Int) => {
    fooMap.get(t) match {
      case Some(tt) => tt
      case None => "notFound"
    }
  }

val fooMap = Map(
    1 -> "small",
    2 -> "big"
 )
val fooUDF = udf(foo)

myDf
    .withColumn("foo", fooUDF(col("foo")))
    .withColumn("foo_2", when($"foo" === 2 , 1).otherwise(0)).select("foo", "foo_2")
    .select("foo", "foo_2")
    .printSchema
Run Code Online (Sandbox Code Playgroud)

但是现在,对于至少一个之前为假的列,可以为空.怎么解释这个?

apache-spark apache-spark-sql apache-spark-dataset

8
推荐指数
1
解决办法
3848
查看次数

我无法在 bitnami/spark docker 容器上使用 --package 选项

我拉了 docker 镜像并执行下面的命令来运行镜像。

  1. docker run -it bitnami/spark:latest /bin/bash

  2. spark-shell --packages="org.elasticsearch:elasticsearch-spark-20_2.11:7.5.0"

我收到如下消息

Ivy Default Cache set to: /opt/bitnami/spark/.ivy2/cache
The jars for the packages stored in: /opt/bitnami/spark/.ivy2/jars
:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.elasticsearch#elasticsearch-spark-20_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c785f3e6-7c78-469f-ab46-451f8be61a4c;1.0
        confs: [default]
Exception in thread "main" java.io.FileNotFoundException: /opt/bitnami/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-c785f3e6-7c78-469f-ab46-451f8be61a4c-1.0.xml (No such file or directory)
        at java.io.FileOutputStream.open0(Native Method)
        at java.io.FileOutputStream.open(FileOutputStream.java:270)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
        at org.apache.ivy.plugins.parser.xml.XmlModuleDescriptorWriter.write(XmlModuleDescriptorWriter.java:70)
        at org.apache.ivy.plugins.parser.xml.XmlModuleDescriptorWriter.write(XmlModuleDescriptorWriter.java:62)
        at org.apache.ivy.core.module.descriptor.DefaultModuleDescriptor.toIvyFile(DefaultModuleDescriptor.java:563)
        at org.apache.ivy.core.cache.DefaultResolutionCacheManager.saveResolvedModuleDescriptor(DefaultResolutionCacheManager.java:176)
        at org.apache.ivy.core.resolve.ResolveEngine.resolve(ResolveEngine.java:245)
        at org.apache.ivy.Ivy.resolve(Ivy.java:523)
        at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1300)
        at org.apache.spark.deploy.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:54) …
Run Code Online (Sandbox Code Playgroud)

elasticsearch docker apache-spark

7
推荐指数
1
解决办法
1331
查看次数

如何在 Spark SQL 中为posexplode 列指定别名?

当我posexplode()在 Spark SQL 中使用函数时,以下语句生成“pos”和“col”作为默认名称

scala> spark.sql(""" with t1(select to_date('2019-01-01') first_day) select first_day,date_sub(add_months(first_day,1),1) last_day, posexplode(array(5,6,7)) from t1 """).show(false)
+----------+----------+---+---+
|first_day |last_day  |pos|col|
+----------+----------+---+---+
|2019-01-01|2019-01-31|0  |5  |
|2019-01-01|2019-01-31|1  |6  |
|2019-01-01|2019-01-31|2  |7  |
+----------+----------+---+---+
Run Code Online (Sandbox Code Playgroud)

在 spark.sql 中覆盖这些默认名称的语法是什么?在数据帧中,这可以通过给出df.explode(select 'arr.as(Seq("arr_val","arr_pos")))

scala> val arr= Array(5,6,7)
arr: Array[Int] = Array(5, 6, 7)

scala> Seq(("dummy")).toDF("x").select(posexplode(lit(arr)).as(Seq("arr_val","arr_pos"))).show(false)
+-------+-------+
|arr_val|arr_pos|
+-------+-------+
|0      |5      |
|1      |6      |
|2      |7      |
+-------+-------+
Run Code Online (Sandbox Code Playgroud)

如何在 SQL 中得到它?我试过

spark.sql(""" with t1(select to_date('2011-01-01') first_day) select first_day,date_sub(add_months(first_day,1),1) last_day, posexplode(array(5,6,7)) as(Seq('p','c')) from t1 """).show(false) …
Run Code Online (Sandbox Code Playgroud)

sql apache-spark apache-spark-sql

5
推荐指数
1
解决办法
4612
查看次数

是否应该在数据集上同时使用缓存和检查点?如果是这样,它如何在后台运行?

我正在一个Spark ML管道上工作,在该管道上我们会在较大的数据集上看到OOM错误。在训练之前我们正在使用cache(); 我换了一下checkpoint(),我们的内存需求大大下降了。然而,在文档进行RDDcheckpoint(),它说:

强烈建议将该RDD保留在内存中,否则将其保存在文件中将需要重新计算。

DataSet我正在使用的检查点未提供相同的指导。无论如何,遵循以上建议,我发现cache()单独使用内存的需求实际上有所增加。

我的期望是当我们这样做时

...
ds.cache()
ds.checkpoint()
...
Run Code Online (Sandbox Code Playgroud)

对检查点的调用会强制对进行评估,该评估会DataSet在被检查点之前同时缓存。之后,任何对的引用都ds将引用缓存的分区,并且如果需要更多的内存并且将分区撤离,将使用检查点分区,而不是重新评估它们。这是真的吗,还是在幕后发生了什么变化?理想情况下,如果可能的话,我希望将DataSet保留在内存中,但是从内存的角度来看,使用缓存和检查点方法似乎没有任何好处。

apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
124
查看次数

在 Spark StringIndexer 中处理 NULL 值

我有一个包含一些分类字符串列的数据集,我想用双精度类型表示它们。我使用 StringIndexer 进行此转换并且它有效,但是当我在另一个具有 NULL 值的数据集中尝试它时,它给出了java.lang.NullPointerException错误并且不起作用。

为了更好地理解这里是我的代码:

for(col <- cols){
    out_name = col ++ "_"
    var indexer = new StringIndexer().setInputCol(col).setOutputCol(out_name)
    var indexed = indexer.fit(df).transform(df)
    df = (indexed.withColumn(col, indexed(out_name))).drop(out_name)
}
Run Code Online (Sandbox Code Playgroud)

那么如何使用 StringIndexer 解决这个 NULL 数据问题呢?

或者是否有更好的解决方案将具有 NULL 值的字符串类型的分类数据转换为 double?

scala categorical-data apache-spark apache-spark-ml

4
推荐指数
1
解决办法
4860
查看次数

2 个列表和/或 1 个二维数组的 udf 声明

我想声明一个返回 2 1D 数组或 1 2D 数组的 udf(两者的示例都很好)。我知道这适用于 1D:

@udf("array<int>")
Run Code Online (Sandbox Code Playgroud)

但是,我尝试了许多变体,例如以下没有运气:

@udf("array<int>,array<int>")
@udf("array<int>","array<int>")
@udf("array<int,int>")
etc. 
Run Code Online (Sandbox Code Playgroud)

user-defined-functions apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
560
查看次数