Shi*_*kou 6 scala apache-spark apache-spark-sql spark-structured-streaming
我有一些代码将两个流DataFrames和输出连接到控制台。
val dataFrame1 =
df1Input.withWatermark("timestamp", "40 seconds").as("A")
val dataFrame2 =
df2Input.withWatermark("timestamp", "40 seconds").as("B")
val finalDF: DataFrame = dataFrame1.join(dataFrame2,
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
, joinType = "leftOuter")
finalDF.writeStream.format("console").start().awaitTermination()
Run Code Online (Sandbox Code Playgroud)
我现在想要的是重构这部分以使用Datasets,这样我就可以进行一些compile-time检查。
所以我尝试的非常简单:
val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
, joinType = "leftOuter")
finalDS.writeStream.format("console").start().awaitTermination()
Run Code Online (Sandbox Code Playgroud)
但是,这会产生以下错误:
org.apache.spark.sql.AnalysisException:如果连接键中没有水印,或者可空端没有水印和适当的范围条件,则不支持两个流式数据帧/数据集之间的流外连接;;
可以看到,join代码没变,所以两边都有水印,还有范围条件。唯一的变化是使用DatasetAPI 而不是DataFrame.
另外,当我使用 inner 时很好join:
val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
)
finalDS.writeStream.format("console").start().awaitTermination()
Run Code Online (Sandbox Code Playgroud)
有谁知道这怎么会发生?
好吧,当您使用joinWith方法而不是join依赖不同的实现时,该实现似乎不支持流数据集的leftOuter join。
您可以通过官方文档的水印部分检查外连接。join未使用方法joinWith。请注意,结果类型将为DataFrame. 这意味着您很可能必须手动映射字段
val finalDS = dataFrame1.as[A].join(dataFrame2.as[B],
expr(
"A.key = B.key" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour"),
joinType = "leftOuter").select(/* useful fields */).as[C]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1718 次 |
| 最近记录: |