小编Luk*_*kiz的帖子

如何为Spark结构化流编写JDBC Sink [SparkException:Task not serializable]?

我需要一个JDBC接收器用于我的Spark结构化流数据帧.目前,据我所知,DataFrame的API缺乏writeStreamJDBC实现(在PySpark和Scala(当前Spark版本2.2.0)中都没有).我发现的唯一建议是ForeachWriter根据本文编写自己的Scala类.

所以,我已经修改了一个简单的词从数例如这里通过添加自定义ForeachWriter类,并试图writeStream到PostgreSQL.单词流是从控制台手动生成的(使用NetCat:nc -lk -p 9999)并由Spark从套接字读取.

不幸的是,我得到"任务不可序列化"的错误.

APACHE_SPARK_VERSION = 2.1.0使用Scala版本2.11.8(Java HotSpot(TM)64位服务器VM,Java 1.8.0_112)

我的Scala代码:

//Spark context available as 'sc' (master = local[*], app id = local-1501242382770).
//Spark session available as 'spark'.

import java.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .master("local[*]")
  .appName("StructuredNetworkWordCountToJDBC")
  .config("spark.jars", "/tmp/data/postgresql-42.1.1.jar")
  .getOrCreate()

import spark.implicits._

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].flatMap(_.split(" "))

val wordCounts = words.groupBy("value").count()

class JDBCSink(url: …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-structured-streaming

11
推荐指数
1
解决办法
4343
查看次数