mav*_*rik 5 apache-spark spark-structured-streaming
运行时:Spark 2.3.0、Scala 2.11(Databricks 4.1 ML beta)
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
//kafka settings and df definition goes here
val query = df.writeStream.format("parquet")
.option("path", ...)
.option("checkpointLocation",...)
.trigger(continuous(30000))
.outputMode(OutputMode.Append)
.start
Run Code Online (Sandbox Code Playgroud)
未找到引发错误:值连续
其他无效的尝试:
.trigger(continuous = "30 seconds") //as per Databricks blog
// throws same error as above
.trigger(Trigger.Continuous("1 second")) //as per Spark docs
// throws java.lang.IllegalStateException: Unknown type of trigger: ContinuousTrigger(1000)
Run Code Online (Sandbox Code Playgroud)
参考:
(Databricks 博客) https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html
(火花指南) http://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html#continuous-processing
(Scaladoc) https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.streaming.package
| 归档时间: |
|
| 查看次数: |
2236 次 |
| 最近记录: |