使用套接字的 Spark Structured Streaming,设置 SCHEMA,在控制台中显示 DATAFRAME

use*_*581 2 apache-spark apache-spark-sql pyspark pyspark-sql spark-structured-streaming

如何DataFrame在 PySpark 中为流设置模式。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *

spark = SparkSession\
    .builder\
    .appName("StructuredNetworkWordCount")\
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
   .readStream\
   .format('socket')\
   .option('host', '192.168.0.113')\
   .option('port', 5560)\
   .load()
Run Code Online (Sandbox Code Playgroud)

例如,我需要一个像这样的表:

Name,  lastName,   PhoneNumber    
Bob, Dylan, 123456    
Jack, Ma, 789456
....
Run Code Online (Sandbox Code Playgroud)

如何将标题/模式设置为 ['Name','lastName','PhoneNumber'] 及其数据类型。

另外,是否可以连续显示此表,或者说DataFrame. 当我尝试时,我收到错误

"pyspark.sql.utils.AnalysisException: '在流式数据帧/数据集上没有流式聚合时不支持完整输出模式;;\nProject"

zer*_*323 5

TextSocketSource不提供任何集成的解析选项。只能使用以下两种格式之一:

如果要更改此格式,则必须转换流以提取感兴趣的字段,例如使用正则表达式:

from pyspark.sql.functions import regexp_extract
from functools import partial

fields = partial(
    regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
)

lines.select(
    fields(idx=1).alias("name"),
    fields(idx=2).alias("last_name"), 
    fields(idx=3).alias("phone_number")
)
Run Code Online (Sandbox Code Playgroud)