如何将完全格式化的 SQL 与 Spark 结构化流结合使用

jav*_*dba 4 scala apache-spark spark-structured-streaming

Spark 结构化流的文档说 - 从 spark 2.3 开始,可用于static DataFrame /的 spark 上下文中的所有方法DataSet也可用于结构化流 DataFrame/ DataSet。但是,我还没有遇到任何相同的例子

对我来说,使用完整格式的 SQL 比DSL. 此外,对于我的用例,这些 SQL 已经针对静态版本进行了开发和测试。还有必须一些返工-尤其是使用join到位号第correlated subqueries。然而,保留整体完整的 sql 结构仍然有很大的价值。

我希望使用的格式就像这个假设的连接:

 val tabaDf = spark.readStream(..)
 val tabbDf = spark.readStream(..)

 val joinSql = """select a.*, 
                  b.productName 
                  from taba
                  join tabb 
                  on a.productId = b.productId
                  where ..
                  group by ..
                  having ..
                  order by .."""
 val joinedStreamingDf = spark.sql(joinSql)
Run Code Online (Sandbox Code Playgroud)

有几个项目不清楚如何做:

  • tabaDftabbDf应该是通过定义spark.readStream:这是我的假设

  • 如何申报tabatabb。尝试使用

    tabaDf.createOrReplaceTempView("taba")
    tabbDf.createOrReplaceTempView("tabb")
    
    Run Code Online (Sandbox Code Playgroud)

    结果是

    警告 ObjectStore:无法获取数据库 global_temp,返回 NoSuchObjectException

我能找到的所有示例都使用DSL和/或selectExpr()- 如下所示https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming- in-apache-spark-2-2.html

 df.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value")
Run Code Online (Sandbox Code Playgroud)

或使用select

sightingLoc
  .groupBy("zip_code", window("start_time", "1 hour"))
  .count()
  .select( 
    to_json(struct("zip_code", "window")).alias("key"),
    col("count").cast("string").alias("value")) 
Run Code Online (Sandbox Code Playgroud)

这些真的是唯一的选择吗- 以至于文档中说数据框/数据集支持的所有方法static都不是真的准确?否则:关于如何纠正上述问题并直接sql使用流媒体的任何指示将不胜感激。

oll*_*ik1 5

需要使用 将流注册为临时视图createOrReplaceTempView。AFAIKcreateOrReplaceView不是 Spark API 的一部分(也许您有一些东西可以提供到具有此类方法的类的隐式转换)。

spark.readStream(..).createOrReplaceTempView("taba")
spark.readStream(..).createOrReplaceTempView("tabb")
Run Code Online (Sandbox Code Playgroud)

现在可以使用纯 SQL 访问视图。例如,要将输出打印到控制台:

spark
  .sql(joinSql)
  .writeStream
  .format("console")
  .start()
  .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

编辑:问题编辑后,我看不出你的代码有什么问题。这是一个最小的工作示例。假设一个测试文件/tmp/foo/foo.csv

"a",1
"b",2
Run Code Online (Sandbox Code Playgroud)
import org.apache.spark.sql.types._
val schema = StructType(Array(StructField("s", StringType), StructField("i", IntegerType)))
spark.readStream
  .schema(schema)
  .csv("/tmp/foo")
  .createOrReplaceTempView("df1")
spark.readStream
  .schema(schema)
  .csv("/tmp/foo")
  .createOrReplaceTempView("df2")

spark.sql("SELECT * FROM df1 JOIN df2 USING (s)")
  .writeStream
  .format("console")
  .start()
  .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

产出

-------------------------------------------
Batch: 0
-------------------------------------------
+---+---+---+
|  s|  i|  i|
+---+---+---+
|  b|  2|  2|
|  a|  1|  1|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)