当我DataFrame在Spark SQL中创建一个JSON文件时,如何在调用之前判断给定列是否存在.select
示例JSON模式:
{
"a": {
"b": 1,
"c": 2
}
}
Run Code Online (Sandbox Code Playgroud)
这就是我想要做的:
potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))
Run Code Online (Sandbox Code Playgroud)
但我找不到一个好的功能hasColumn.我得到的最接近的是测试列是否在这个有点笨拙的数组中:
scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)
Run Code Online (Sandbox Code Playgroud) 有人可以分享一下如何将一个转换dataframe成一个RDD?
考虑我有一个已定义的架构,用于在文件夹中加载10个csv文件.有没有办法使用Spark SQL自动加载表.我知道这可以通过为每个文件[下面给出]使用单独的数据帧来执行,但是可以使用单个命令自动执行而不是指向文件我可以指向文件夹吗?
df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load("../Downloads/2008.csv")
Run Code Online (Sandbox Code Playgroud) 我构建了两个数据帧.我们如何加入多个Spark数据帧?
例如 :
PersonDf,ProfileDf使用公共列personId作为(键).现在,我们怎样才能有一个数据帧合并PersonDf和ProfileDf?
我想在Apache Spark连接中包含空值.Spark默认情况下不包含null的行.
这是默认的Spark行为.
val numbersDf = Seq(
("123"),
("456"),
(null),
("")
).toDF("numbers")
val lettersDf = Seq(
("123", "abc"),
("456", "def"),
(null, "zzz"),
("", "hhh")
).toDF("numbers", "letters")
val joinedDf = numbersDf.join(lettersDf, Seq("numbers"))
Run Code Online (Sandbox Code Playgroud)
这是输出joinedDf.show():
+-------+-------+
|numbers|letters|
+-------+-------+
| 123| abc|
| 456| def|
| | hhh|
+-------+-------+
Run Code Online (Sandbox Code Playgroud)
这是我想要的输出:
+-------+-------+
|numbers|letters|
+-------+-------+
| 123| abc|
| 456| def|
| | hhh|
| null| zzz|
+-------+-------+
Run Code Online (Sandbox Code Playgroud) 我想选择一个等于某个值的列.我在scala中做这个并且有点麻烦.
继承我的代码
df.select(df("state")==="TX").show()
Run Code Online (Sandbox Code Playgroud)
这将返回状态列,其中包含布尔值而不仅仅是TX
我也试过了
df.select(df("state")=="TX").show()
Run Code Online (Sandbox Code Playgroud)
但这也不起作用.
我知道如何在Spark SQL中编写UDF:
def belowThreshold(power: Int): Boolean = {
return power < -40
}
sqlContext.udf.register("belowThreshold", belowThreshold _)
Run Code Online (Sandbox Code Playgroud)
我可以做类似的定义聚合函数吗?这是怎么做到的?
对于上下文,我想运行以下SQL查询:
val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")
Run Code Online (Sandbox Code Playgroud)
它应该返回类似的东西
Row(span1, false, T0)
我希望聚合函数告诉我opticalReceivePower在定义的组中是否有任何值span,timestamp哪些值低于阈值.我是否需要以不同的方式将UDAF写入上面粘贴的UDF?
scala aggregate-functions user-defined-functions apache-spark apache-spark-sql
我有一个Cassandra表,为简单起见,看起来像:
key: text
jsonData: text
blobData: blob
Run Code Online (Sandbox Code Playgroud)
我可以使用spark和spark-cassandra-connector为此创建一个基本数据框:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
Run Code Online (Sandbox Code Playgroud)
我正在努力将JSON数据扩展到其底层结构中.我最终希望能够根据json字符串中的属性进行过滤并返回blob数据.像jsonData.foo ="bar"之类的东西并返回blobData.这目前可能吗?
scala dataframe apache-spark apache-spark-sql spark-cassandra-connector
所有,
是否有一种优雅且可接受的方式来使用嵌套的列展平Spark SQL表(Parquet) StructType
例如
如果我的架构是:
foo
|_bar
|_baz
x
y
z
Run Code Online (Sandbox Code Playgroud)
如何在不依靠手动运行的情况下将其选择为展平的表格形式
df.select("foo.bar","foo.baz","x","y","z")
Run Code Online (Sandbox Code Playgroud)
换句话说,如何在a StructType和a下以编程方式获得上述代码的结果DataFrame
pyspark中有一个DataFrame,数据如下:
user_id object_id score
user_1 object_1 3
user_1 object_1 1
user_1 object_2 2
user_2 object_1 5
user_2 object_2 2
user_2 object_2 6
Run Code Online (Sandbox Code Playgroud)
我期望在每个组中返回具有相同user_id的2条记录,这些记录需要具有最高分.因此,结果应如下所示:
user_id object_id score
user_1 object_1 3
user_1 object_2 2
user_2 object_2 6
user_2 object_1 5
Run Code Online (Sandbox Code Playgroud)
我是pyspark的新手,有人能给我一个代码片段或门户网站来解决这个问题的相关文档吗?十分感谢!
apache-spark ×10
apache-spark-sql ×10
scala ×8
dataframe ×5
join ×1
pyspark ×1
python ×1
sql ×1