Roh*_*ala 2 scala apache-spark spark-structured-streaming
我使用 Spark 2.1.0.cloudera1。
我在流数据帧中有一个数组,数组中的数据如下所示:
["Windows","Ubuntu","Ubuntu","Mac","Mac","Windows","Windows"]
Run Code Online (Sandbox Code Playgroud)
我需要这个数组的大小,不包括元素“Windows”,即
以下是我遵循的方法
WITH os_count AS(
SELECT
cluster_id,
count(e) AS cnt
FROM systems
LATERAL VIEW EXPLODE(all_os) exploded as e
WHERE e <> 'Windows'
GROUP BY cluster_id)
SELECT
a.cluster_id,
a.memory,
a.storage,
c.cnt
FROM
systems a
JOIN
os_count c
ON(a.cluster_id = c.cluster_id)
Run Code Online (Sandbox Code Playgroud)
但是对于我的用例,我的查询中不能有 JOIN,因为 Spark 结构化流在 Spark 2.3 之前没有对 Join 的适当支持
我可以
SELECT SIZE(cluster.all_os) FROM systems
Run Code Online (Sandbox Code Playgroud)
这将返回 7,但我想用“Windows”过滤掉元素,而应该返回 4,不知道如何在不执行连接的情况下继续操作!
我通过在 spark(Scala) 中编写 UDF 来实现这一点,以下是逻辑:
import org.apache.spark.sql.functions._
val osCountFunction: Seq[String] => Int = _.par.filter(_!="Windows").size
val osCountUDF = udf(osCountFunction)
Run Code Online (Sandbox Code Playgroud)
请让我知道是否有更好的方法!
编辑 1
UDF的用法:
val inputStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.select(from_json($"value",systemSchema).as("data"))
.withColumn("os_count_with_udf", osCountUDF(col("data.all_os")))
inputStream.createOrReplaceTempView("data_view")
spark.sql("SELECT os_count_with_udf from data_view")
.writeStream
.format("console")
.option("truncate","false")
.start()
Run Code Online (Sandbox Code Playgroud)
注意:data.all_os 是 Array[String] 类型。