如何创建自定义流数据源?

szu*_*szu 7 apache-spark spark-structured-streaming

我有一个Spark Streaming自定义阅读器,可以从WebSocket读取数据.我将尝试Spark Structured Streaming.

如何在Spark Structured Streaming中创建流数据源?

alz*_*lz2 12

随着Spark转向V2 API,您现在必须实现DataSourceV2,MicroBatchReadSupportDataSourceRegister.

这将涉及到创造自己的实现Offset,MicroBatchReader,DataReader<Row>,和DataReaderFactory<Row>.

有一些例子在线这是对我很有帮助书面我的自定义结构化数据流的例子(Scala中).

一旦实现了自定义源,就可以按照Jacek Laskowski的说明注册源代码.

此外,根据您从套接字接收的消息的编码,您可以只使用默认套接字源并使用自定义映射函数将信息解析为您将使用的任何Bean.虽然请注意Spark说默认套接字流源不应该用于生产!

希望这可以帮助!


Jac*_*ski 6

流数据源实现org.apache.spark.sql.execution.streaming.Source.

scaladoc org.apache.spark.sql.execution.streaming.Source应该为您提供足够的信息以便开始(只需按照类型开发可编译的Scala类型).

一旦你的Source,你必须注册,所以你可以使用它formatDataStreamReader.使流媒体源可用以便您可以使用的技巧format是通过创建DataSourceRegister流媒体源来注册它.您可以在META-INF/services/org.apache.spark.sql.sources.DataSourceRegister中找到示例:

org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
Run Code Online (Sandbox Code Playgroud)

这是将短名称链接format到实现的文件.

我通常建议人们在Spark研讨会期间做的是从双方开始开发:

  1. 编写流式查询(带format),例如

    val input = spark
      .readStream
      .format("yourCustomSource") // <-- your custom source here
      .load
    
    Run Code Online (Sandbox Code Playgroud)
  2. 实现流媒体Source和相应的DataSourceRegister(它可能是同一个类)

  3. (可选)注册DataSourceRegister通过编写完全合格的类名,比方说com.mycompany.spark.MyDataSourceRegister,到META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:

    $ cat META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
    com.mycompany.spark.MyDataSourceRegister
    
    Run Code Online (Sandbox Code Playgroud)

注册DataSourceRegister自定义实现的最后一步Source是可选的,仅用于注册最终用户在DataFrameReader.format方法中使用的数据源别名.

format(source:String):DataFrameReader 指定输入数据源格式.

查看org.apache.spark.sql.execution.streaming.RateSourceProvider的代码,以获得良好的开端.