我使用Spark 2.2.0.
如何使用pyspark将Amazon SQS流提供给spark结构化流?
这个问题试图通过创建自定义接收器来解决非结构化流和scala的问题.
pyspark中有类似的东西吗?
spark.readStream \
.format("s3-sqs") \
.option("fileFormat", "json") \
.option("queueUrl", ...) \
.schema(...) \
.load()
Run Code Online (Sandbox Code Playgroud)
根据Databricks上面的接收器可以用于S3-SQS文件源.但是,对于只有SQS,如何才能采用一种方法.
我尝试从AWS-SQS-Receive_Message理解接收消息.但是,如何直接将流发送到火花流还不清楚.
amazon-sqs apache-spark pyspark-sql spark-structured-streaming
在Python中,我能够编写用于套接字编程的代码,以通过UDP接收数据流.
但是,如何在R中使用等效代码来执行相同操作?
import socket, traceback
host = ''
port = 5555
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.bind((host, port))
counter = 1500
while counter > 0:
counter -= 1
try:
message, address = s.recvfrom(8192)
message = message.decode()
data = message.split(",")
print(data)
except (KeyboardInterrupt, SystemExit):
raise
except:
traceback.print_exc()
Run Code Online (Sandbox Code Playgroud)
在RI尝试下面的代码,这是不成功的.我确实意识到我需要提到它是UDP和所有,但无法找到这些设置.
我需要从设备接收数据的"流".我需要有R服务器吗?
平台x86_64-pc-linux-gnu
os linux-gnu
version.string R版本3.2.3(2015-12-10)RStudio版本1.0.44
server <- function(){
while(TRUE){
writeLines("Listening...")
con <- socketConnection(host="localhost", port = 5555, blocking=TRUE,server=TRUE, open="r+")
data <- readLines(con, 1)
print(data)
close(con)
}
}
server()
Run Code Online (Sandbox Code Playgroud) http://spark.apache.org/docs/latest/streaming-programming-guide.html 中提到的示例 让我在TCP流中接收数据包并侦听端口 9999
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words …Run Code Online (Sandbox Code Playgroud) 在运行python spark结构流的编程指南[link]中给出的示例
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
我得到以下错误:
TypeError:'Builder'对象不可调用
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession.builder()\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark\
.readStream\
.format('socket')\
.option('host', 'localhost')\
.option('port', 9999)\
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, ' ')
).alias('word')
)
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the …Run Code Online (Sandbox Code Playgroud) 如何DataFrame在 PySpark 中为流设置模式。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
.readStream\
.format('socket')\
.option('host', '192.168.0.113')\
.option('port', 5560)\
.load()
Run Code Online (Sandbox Code Playgroud)
例如,我需要一个像这样的表:
Name, lastName, PhoneNumber
Bob, Dylan, 123456
Jack, Ma, 789456
....
Run Code Online (Sandbox Code Playgroud)
如何将标题/模式设置为 ['Name','lastName','PhoneNumber'] 及其数据类型。
另外,是否可以连续显示此表,或者说DataFrame. 当我尝试时,我收到错误
"pyspark.sql.utils.AnalysisException: '在流式数据帧/数据集上没有流式聚合时不支持完整输出模式;;\nProject"
apache-spark apache-spark-sql pyspark pyspark-sql spark-structured-streaming