小编use*_*581的帖子

如何从Amazon SQS加载流数据?

我使用Spark 2.2.0.

如何使用pyspark将Amazon SQS流提供给spark结构化流?

这个问题试图通过创建自定义接收器来解决非结构化流和scala的问题.
pyspark中有类似的东西吗?

spark.readStream \
   .format("s3-sqs") \
   .option("fileFormat", "json") \
   .option("queueUrl", ...) \
   .schema(...) \
   .load()
Run Code Online (Sandbox Code Playgroud)

根据Databricks上面的接收器可以用于S3-SQS文件源.但是,对于只有SQS,如何才能采用一种方法.

我尝试从AWS-SQS-Receive_Message理解接收消息.但是,如何直接将流发送到火花流还不清楚.

amazon-sqs apache-spark pyspark-sql spark-structured-streaming

13
推荐指数
1
解决办法
2719
查看次数

R中的套接字编程接收UDP流

在Python中,我能够编写用于套接字编程的代码,以通过UDP接收数据流.

但是,如何在R中使用等效代码来执行相同操作?

import socket, traceback   
host = ''
port = 5555
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.bind((host, port))

counter = 1500
while counter > 0:
    counter -= 1
    try:
    message, address = s.recvfrom(8192)
    message = message.decode() 
    data = message.split(",")
    print(data)


except (KeyboardInterrupt, SystemExit):
    raise
except:
    traceback.print_exc()
Run Code Online (Sandbox Code Playgroud)

在RI尝试下面的代码,这是不成功的.我确实意识到我需要提到它是UDP和所有,但无法找到这些设置.

我需要从设备接收数据的"流".我需要有R服务器吗?

平台x86_64-pc-linux-gnu
os linux-gnu
version.string R版本3.2.3(2015-12-10)RStudio版本1.0.44

server <- function(){
  while(TRUE){
    writeLines("Listening...")
    con <- socketConnection(host="localhost", port = 5555, blocking=TRUE,server=TRUE, open="r+")
    data <- readLines(con, 1)
    print(data)
    close(con)
 }
}

server()
Run Code Online (Sandbox Code Playgroud)

r

7
推荐指数
1
解决办法
1298
查看次数

Spark Scala UDP 在侦听端口上接收

http://spark.apache.org/docs/latest/streaming-programming-guide.html 中提到的示例 让我在TCP流中接收数据包并侦听端口 9999

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))


 // Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words …
Run Code Online (Sandbox Code Playgroud)

sockets udp scala apache-spark spark-streaming

3
推荐指数
1
解决办法
2441
查看次数

TypeError:'Builder'对象不可调用Spark结构化流

在运行python spark结构流的编程指南[link]中给出的示例
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

我得到以下错误:
TypeError:'Builder'对象不可调用

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession.builder()\
    .appName("StructuredNetworkWordCount")\
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark\
   .readStream\
   .format('socket')\
   .option('host', 'localhost')\
   .option('port', 9999)\
   .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, ' ')
   ).alias('word')
)

# Generate running word count
wordCounts = words.groupBy('word').count()

# Start running the query that prints the running counts to the …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-structured-streaming

2
推荐指数
1
解决办法
2834
查看次数

使用套接字的 Spark Structured Streaming,设置 SCHEMA,在控制台中显示 DATAFRAME

如何DataFrame在 PySpark 中为流设置模式。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *

spark = SparkSession\
    .builder\
    .appName("StructuredNetworkWordCount")\
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
   .readStream\
   .format('socket')\
   .option('host', '192.168.0.113')\
   .option('port', 5560)\
   .load()
Run Code Online (Sandbox Code Playgroud)

例如,我需要一个像这样的表:

Name,  lastName,   PhoneNumber    
Bob, Dylan, 123456    
Jack, Ma, 789456
....
Run Code Online (Sandbox Code Playgroud)

如何将标题/模式设置为 ['Name','lastName','PhoneNumber'] 及其数据类型。

另外,是否可以连续显示此表,或者说DataFrame. 当我尝试时,我收到错误

"pyspark.sql.utils.AnalysisException: '在流式数据帧/数据集上没有流式聚合时不支持完整输出模式;;\nProject"

apache-spark apache-spark-sql pyspark pyspark-sql spark-structured-streaming

2
推荐指数
1
解决办法
1225
查看次数