标签: apache-spark-sql

如何检测Spark DataFrame是否具有列

当我DataFrame在Spark SQL中创建一个JSON文件时,如何在调用之前判断给定列是否存在.select

示例JSON模式:

{
  "a": {
    "b": 1,
    "c": 2
  }
}
Run Code Online (Sandbox Code Playgroud)

这就是我想要做的:

potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))
Run Code Online (Sandbox Code Playgroud)

但我找不到一个好的功能hasColumn.我得到的最接近的是测试列是否在这个有点笨拙的数组中:

scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql

39
推荐指数
5
解决办法
5万
查看次数

38
推荐指数
1
解决办法
7万
查看次数

如何在单个加载中导入多个csv文件?

考虑我有一个已定义的架构,用于在文件夹中加载10个csv文件.有没有办法使用Spark SQL自动加载表.我知道这可以通过为每个文件[下面给出]使用单独的数据帧来执行,但是可以使用单个命令自动执行而不是指向文件我可以指向文件夹吗?

df = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("header", "true")
       .load("../Downloads/2008.csv")
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

38
推荐指数
5
解决办法
6万
查看次数

在密钥上加入Spark数据帧

我构建了两个数据帧.我们如何加入多个Spark数据帧?

例如 :

PersonDf,ProfileDf使用公共列personId作为(键).现在,我们怎样才能有一个数据帧合并PersonDfProfileDf

scala dataframe apache-spark apache-spark-sql

38
推荐指数
4
解决办法
11万
查看次数

在Apache Spark Join中包含空值

我想在Apache Spark连接中包含空值.Spark默认情况下不包含null的行.

这是默认的Spark行为.

val numbersDf = Seq(
  ("123"),
  ("456"),
  (null),
  ("")
).toDF("numbers")

val lettersDf = Seq(
  ("123", "abc"),
  ("456", "def"),
  (null, "zzz"),
  ("", "hhh")
).toDF("numbers", "letters")

val joinedDf = numbersDf.join(lettersDf, Seq("numbers"))
Run Code Online (Sandbox Code Playgroud)

这是输出joinedDf.show():

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
+-------+-------+
Run Code Online (Sandbox Code Playgroud)

这是我想要的输出:

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
|   null|    zzz|
+-------+-------+
Run Code Online (Sandbox Code Playgroud)

sql scala join apache-spark apache-spark-sql

38
推荐指数
3
解决办法
2万
查看次数

如何在没有SQL查询的情况下使用Spark Dataframe检查是否相等?

我想选择一个等于某个值的列.我在scala中做这个并且有点麻烦.

继承我的代码

df.select(df("state")==="TX").show()
Run Code Online (Sandbox Code Playgroud)

这将返回状态列,其中包含布尔值而不仅仅是TX

我也试过了

df.select(df("state")=="TX").show() 
Run Code Online (Sandbox Code Playgroud)

但这也不起作用.

scala dataframe apache-spark apache-spark-sql

37
推荐指数
7
解决办法
10万
查看次数

如何在Spark SQL中定义和使用用户定义的聚合函数?

我知道如何在Spark SQL中编写UDF:

def belowThreshold(power: Int): Boolean = {
        return power < -40
      }

sqlContext.udf.register("belowThreshold", belowThreshold _)
Run Code Online (Sandbox Code Playgroud)

我可以做类似的定义聚合函数吗?这是怎么做到的?

对于上下文,我想运行以下SQL查询:

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
                                    FROM ifDF
                                    WHERE opticalReceivePower IS NOT null
                                    GROUP BY span, timestamp
                                    ORDER BY span""")
Run Code Online (Sandbox Code Playgroud)

它应该返回类似的东西

Row(span1, false, T0)

我希望聚合函数告诉我opticalReceivePower在定义的组中是否有任何值span,timestamp哪些值低于阈值.我是否需要以不同的方式将UDAF写入上面粘贴的UDF?

scala aggregate-functions user-defined-functions apache-spark apache-spark-sql

37
推荐指数
1
解决办法
3万
查看次数

如何使用Spark DataFrames查询JSON数据列?

我有一个Cassandra表,为简单起见,看起来像:

key: text
jsonData: text
blobData: blob
Run Code Online (Sandbox Code Playgroud)

我可以使用spark和spark-cassandra-connector为此创建一个基本数据框:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()
Run Code Online (Sandbox Code Playgroud)

我正在努力将JSON数据扩展到其底层结构中.我最终希望能够根据json字符串中的属性进行过滤并返回blob数据.像jsonData.foo ="bar"之类的东西并返回blobData.这目前可能吗?

scala dataframe apache-spark apache-spark-sql spark-cassandra-connector

37
推荐指数
2
解决办法
4万
查看次数

在Spark SQL中自动且优雅地展平DataFrame

所有,

是否有一种优雅且可接受的方式来使用嵌套的列展平Spark SQL表(Parquet) StructType

例如

如果我的架构是:

foo
 |_bar
 |_baz
x
y
z
Run Code Online (Sandbox Code Playgroud)

如何在不依靠手动运行的情况下将其选择为展平的表格形式

df.select("foo.bar","foo.baz","x","y","z")
Run Code Online (Sandbox Code Playgroud)

换句话说,如何在a StructType和a下以编程方式获得上述代码的结果DataFrame

scala apache-spark apache-spark-sql

37
推荐指数
4
解决办法
3万
查看次数

在pyspark中的每个DataFrame组中检索前n个

pyspark中有一个DataFrame,数据如下:

user_id object_id score
user_1  object_1  3
user_1  object_1  1
user_1  object_2  2
user_2  object_1  5
user_2  object_2  2
user_2  object_2  6
Run Code Online (Sandbox Code Playgroud)

我期望在每个组中返回具有相同user_id的2条记录,这些记录需要具有最高分.因此,结果应如下所示:

user_id object_id score
user_1  object_1  3
user_1  object_2  2
user_2  object_2  6
user_2  object_1  5
Run Code Online (Sandbox Code Playgroud)

我是pyspark的新手,有人能给我一个代码片段或门户网站来解决这个问题的相关文档吗?十分感谢!

python dataframe apache-spark apache-spark-sql pyspark

36
推荐指数
2
解决办法
4万
查看次数