如何计算流数据集中数组字段中元素的数量(一个除外)?

Roh*_*ala 2 scala apache-spark spark-structured-streaming

我使用 Spark 2.1.0.cloudera1。

我在流数据帧中有一个数组,数组中的数据如下所示:

["Windows","Ubuntu","Ubuntu","Mac","Mac","Windows","Windows"]
Run Code Online (Sandbox Code Playgroud)

我需要这个数组的大小,不包括元素“Windows”,即

以下是我遵循的方法

WITH os_count AS(
SELECT  
    cluster_id,
    count(e) AS cnt
FROM systems
LATERAL VIEW EXPLODE(all_os) exploded as e
WHERE e <> 'Windows'
GROUP BY cluster_id)

SELECT
    a.cluster_id,
    a.memory,
    a.storage,
    c.cnt
FROM
    systems a
JOIN
    os_count c
ON(a.cluster_id = c.cluster_id)
Run Code Online (Sandbox Code Playgroud)

但是对于我的用例,我的查询中不能有 JOIN,因为 Spark 结构化流在 Spark 2.3 之前没有对 Join 的适当支持

我可以

SELECT SIZE(cluster.all_os) FROM systems
Run Code Online (Sandbox Code Playgroud)

这将返回 7,但我想用“Windows”过滤掉元素,而应该返回 4,不知道如何在不执行连接的情况下继续操作!

Roh*_*ala 5

我通过在 spark(Scala) 中编写 UDF 来实现这一点,以下是逻辑:

    import org.apache.spark.sql.functions._

    val osCountFunction: Seq[String] => Int = _.par.filter(_!="Windows").size
    val osCountUDF = udf(osCountFunction)
Run Code Online (Sandbox Code Playgroud)

请让我知道是否有更好的方法!

编辑 1

UDF的用法:

val inputStream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]
      .select(from_json($"value",systemSchema).as("data"))
      .withColumn("os_count_with_udf", osCountUDF(col("data.all_os")))

inputStream.createOrReplaceTempView("data_view")
spark.sql("SELECT os_count_with_udf from data_view")
      .writeStream
      .format("console")
      .option("truncate","false")
      .start()
Run Code Online (Sandbox Code Playgroud)

注意:data.all_os 是 Array[String] 类型。

  • 您最好将 https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html 升级到 2.4.0 :) (2认同)