我使用Spark 2.2.0.
如何使用pyspark将Amazon SQS流提供给spark结构化流?
这个问题试图通过创建自定义接收器来解决非结构化流和scala的问题.
pyspark中有类似的东西吗?
spark.readStream \
.format("s3-sqs") \
.option("fileFormat", "json") \
.option("queueUrl", ...) \
.schema(...) \
.load()
Run Code Online (Sandbox Code Playgroud)
根据Databricks上面的接收器可以用于S3-SQS文件源.但是,对于只有SQS,如何才能采用一种方法.
我尝试从AWS-SQS-Receive_Message理解接收消息.但是,如何直接将流发送到火花流还不清楚.
amazon-sqs apache-spark pyspark-sql spark-structured-streaming
在Python中,我能够编写用于套接字编程的代码,以通过UDP接收数据流.
但是,如何在R中使用等效代码来执行相同操作?
import socket, traceback
host = ''
port = 5555
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.bind((host, port))
counter = 1500
while counter > 0:
counter -= 1
try:
message, address = s.recvfrom(8192)
message = message.decode()
data = message.split(",")
print(data)
except (KeyboardInterrupt, SystemExit):
raise
except:
traceback.print_exc()
Run Code Online (Sandbox Code Playgroud)
在RI尝试下面的代码,这是不成功的.我确实意识到我需要提到它是UDP和所有,但无法找到这些设置.
我需要从设备接收数据的"流".我需要有R服务器吗?
平台x86_64-pc-linux-gnu
os linux-gnu
version.string R版本3.2.3(2015-12-10)RStudio版本1.0.44
server <- function(){
while(TRUE){
writeLines("Listening...")
con <- socketConnection(host="localhost", port = 5555, blocking=TRUE,server=TRUE, open="r+")
data <- readLines(con, 1)
print(data)
close(con)
}
}
server()
Run Code Online (Sandbox Code Playgroud) http://spark.apache.org/docs/latest/streaming-programming-guide.html 中提到的示例 让我在TCP流中接收数据包并侦听端口 9999
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words …
Run Code Online (Sandbox Code Playgroud) 在运行python spark结构流的编程指南[link]中给出的示例
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
我得到以下错误:
TypeError:'Builder'对象不可调用
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession.builder()\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark\
.readStream\
.format('socket')\
.option('host', 'localhost')\
.option('port', 9999)\
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, ' ')
).alias('word')
)
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the …
Run Code Online (Sandbox Code Playgroud) 如何DataFrame
在 PySpark 中为流设置模式。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
.readStream\
.format('socket')\
.option('host', '192.168.0.113')\
.option('port', 5560)\
.load()
Run Code Online (Sandbox Code Playgroud)
例如,我需要一个像这样的表:
Name, lastName, PhoneNumber
Bob, Dylan, 123456
Jack, Ma, 789456
....
Run Code Online (Sandbox Code Playgroud)
如何将标题/模式设置为 ['Name','lastName','PhoneNumber'] 及其数据类型。
另外,是否可以连续显示此表,或者说DataFrame
. 当我尝试时,我收到错误
"pyspark.sql.utils.AnalysisException: '在流式数据帧/数据集上没有流式聚合时不支持完整输出模式;;\nProject"
apache-spark apache-spark-sql pyspark pyspark-sql spark-structured-streaming