如何从Amazon SQS加载流数据?

use*_*581 13 amazon-sqs apache-spark pyspark-sql spark-structured-streaming

我使用Spark 2.2.0.

如何使用pyspark将Amazon SQS流提供给spark结构化流?

这个问题试图通过创建自定义接收器来解决非结构化流和scala的问题.
pyspark中有类似的东西吗?

spark.readStream \
   .format("s3-sqs") \
   .option("fileFormat", "json") \
   .option("queueUrl", ...) \
   .schema(...) \
   .load()
Run Code Online (Sandbox Code Playgroud)

根据Databricks上面的接收器可以用于S3-SQS文件源.但是,对于只有SQS,如何才能采用一种方法.

我尝试从AWS-SQS-Receive_Message理解接收消息.但是,如何直接将流发送到火花流还不清楚.

Jac*_*ski 5

我对Amazon SQS一无所知,但是“如何使用pyspark将Amazon SQS流提供给结构化流。” 对于任何外部消息传递系统或使用Spark结构化流(也称为Spark“流”)的数据源,都无法使用。

在Spark结构化流中,这是Spark的另一种方式,它是Spark定期提取数据(类似于Kafka Consumer API在未提取数据的情况下的工作方式)。

换句话说,Spark“流”只是Amazon SQS中“队列”中消息的另一个使用者。

每当我被要求将外部系统与Spark“ Streams”集成时,我就会开始使用客户端/消费者API为系统编写客户端。

一旦有了它,下一步就是使用上面的示例客户端代码为外部系统(例如Amazon SQS)开发自定义流

开发自定义流式传输时,Source您必须执行以下步骤:

  1. 编写实现Source特征的Scala类

  2. Source使用META-INF/services/org.apache.spark.sql.sources.DataSourceRegister具有完全限定的类名称的文件向Spark SQL 注册Scala类(自定义),或在中使用完全限定的类名称format

拥有自定义流源是一个分为两部分的开发,其中包括开发源(并可选地将其注册到Spark SQL)并通过方法在Spark Structured Streaming应用程序(在Python中)中使用它format