jav*_*dba 4 scala apache-spark spark-structured-streaming
Spark 结构化流的文档说 - 从 spark 2.3 开始,可用于static DataFrame /的 spark 上下文中的所有方法DataSet也可用于结构化流 DataFrame/ DataSet。但是,我还没有遇到任何相同的例子。
对我来说,使用完整格式的 SQL 比DSL. 此外,对于我的用例,这些 SQL 已经针对静态版本进行了开发和测试。还有必须是一些返工-尤其是使用join到位号第correlated subqueries。然而,保留整体完整的 sql 结构仍然有很大的价值。
我希望使用的格式就像这个假设的连接:
val tabaDf = spark.readStream(..)
val tabbDf = spark.readStream(..)
val joinSql = """select a.*,
b.productName
from taba
join tabb
on a.productId = b.productId
where ..
group by ..
having ..
order by .."""
val joinedStreamingDf = spark.sql(joinSql)
Run Code Online (Sandbox Code Playgroud)
有几个项目不清楚如何做:
是tabaDf和tabbDf应该是通过定义spark.readStream:这是我的假设
如何申报taba和tabb。尝试使用
tabaDf.createOrReplaceTempView("taba")
tabbDf.createOrReplaceTempView("tabb")
Run Code Online (Sandbox Code Playgroud)
结果是
警告 ObjectStore:无法获取数据库 global_temp,返回 NoSuchObjectException
我能找到的所有示例都使用DSL和/或selectExpr()- 如下所示https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming- in-apache-spark-2-2.html
df.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value")
Run Code Online (Sandbox Code Playgroud)
或使用select:
sightingLoc
.groupBy("zip_code", window("start_time", "1 hour"))
.count()
.select(
to_json(struct("zip_code", "window")).alias("key"),
col("count").cast("string").alias("value"))
Run Code Online (Sandbox Code Playgroud)
这些真的是唯一的选择吗- 以至于文档中说数据框/数据集支持的所有方法static都不是真的准确?否则:关于如何纠正上述问题并直接sql使用流媒体的任何指示将不胜感激。
需要使用 将流注册为临时视图createOrReplaceTempView。AFAIKcreateOrReplaceView不是 Spark API 的一部分(也许您有一些东西可以提供到具有此类方法的类的隐式转换)。
spark.readStream(..).createOrReplaceTempView("taba")
spark.readStream(..).createOrReplaceTempView("tabb")
Run Code Online (Sandbox Code Playgroud)
现在可以使用纯 SQL 访问视图。例如,要将输出打印到控制台:
spark
.sql(joinSql)
.writeStream
.format("console")
.start()
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
编辑:问题编辑后,我看不出你的代码有什么问题。这是一个最小的工作示例。假设一个测试文件/tmp/foo/foo.csv
"a",1
"b",2
Run Code Online (Sandbox Code Playgroud)
import org.apache.spark.sql.types._
val schema = StructType(Array(StructField("s", StringType), StructField("i", IntegerType)))
spark.readStream
.schema(schema)
.csv("/tmp/foo")
.createOrReplaceTempView("df1")
spark.readStream
.schema(schema)
.csv("/tmp/foo")
.createOrReplaceTempView("df2")
spark.sql("SELECT * FROM df1 JOIN df2 USING (s)")
.writeStream
.format("console")
.start()
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
产出
-------------------------------------------
Batch: 0
-------------------------------------------
+---+---+---+
| s| i| i|
+---+---+---+
| b| 2| 2|
| a| 1| 1|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1169 次 |
| 最近记录: |