我需要一个JDBC接收器用于我的Spark结构化流数据帧.目前,据我所知,DataFrame的API缺乏writeStreamJDBC实现(在PySpark和Scala(当前Spark版本2.2.0)中都没有).我发现的唯一建议是ForeachWriter根据本文编写自己的Scala类.
所以,我已经修改了一个简单的词从数例如这里通过添加自定义ForeachWriter类,并试图writeStream到PostgreSQL.单词流是从控制台手动生成的(使用NetCat:nc -lk -p 9999)并由Spark从套接字读取.
不幸的是,我得到"任务不可序列化"的错误.
APACHE_SPARK_VERSION = 2.1.0使用Scala版本2.11.8(Java HotSpot(TM)64位服务器VM,Java 1.8.0_112)
我的Scala代码:
//Spark context available as 'sc' (master = local[*], app id = local-1501242382770).
//Spark session available as 'spark'.
import java.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.master("local[*]")
.appName("StructuredNetworkWordCountToJDBC")
.config("spark.jars", "/tmp/data/postgresql-42.1.1.jar")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
class JDBCSink(url: …Run Code Online (Sandbox Code Playgroud)