use*_*018 5 postgresql apache-kafka apache-spark spark-streaming
我使用 PostGre 作为数据库。我想为每个批次捕获一个表数据并将其转换为 parquet 文件并存储到 s3 中。我尝试使用 Spark 和 readStream 的 JDBC 选项进行连接,如下所示......
val jdbcDF = spark.readStream
.format("jdbc")
.option("url", "jdbc:postgresql://myserver:5432/mydatabase")
.option("dbtable", "database.schema.table")
.option("user", "xxxxx")
.option("password", "xxxxx")
.load()
Run Code Online (Sandbox Code Playgroud)
但它抛出了不受支持的异常
Exception in thread "main" java.lang.UnsupportedOperationException: Data source jdbc does not support streamed reading
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at examples.SparkJDBCStreaming$.delayedEndpoint$examples$SparkJDBCStreaming$1(SparkJDBCStreaming.scala:16)
at examples.SparkJDBCStreaming$delayedInit$body.apply(SparkJDBCStreaming.scala:5)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
Run Code Online (Sandbox Code Playgroud)
我走在正确的轨道上吗?Spark Streaming 真的不支持数据库作为数据源吗?
据我所知,执行此操作的另一种方法是编写一个 kafka 生产者将数据发布到 kafka 主题,然后使用 Spark Streaming...
注意:我不想为此使用 kafka connect,因为我需要做一些辅助转换。
这是唯一的方法吗?
这样做的正确方法是什么?有这样的例子吗?请协助!
归档时间: |
|
查看次数: |
8398 次 |
最近记录: |