use*_*581 2 apache-spark apache-spark-sql pyspark pyspark-sql spark-structured-streaming
如何DataFrame在 PySpark 中为流设置模式。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
.readStream\
.format('socket')\
.option('host', '192.168.0.113')\
.option('port', 5560)\
.load()
Run Code Online (Sandbox Code Playgroud)
例如,我需要一个像这样的表:
Name, lastName, PhoneNumber
Bob, Dylan, 123456
Jack, Ma, 789456
....
Run Code Online (Sandbox Code Playgroud)
如何将标题/模式设置为 ['Name','lastName','PhoneNumber'] 及其数据类型。
另外,是否可以连续显示此表,或者说DataFrame. 当我尝试时,我收到错误
"pyspark.sql.utils.AnalysisException: '在流式数据帧/数据集上没有流式聚合时不支持完整输出模式;;\nProject"
TextSocketSource不提供任何集成的解析选项。只能使用以下两种格式之一:
时间戳和文本 ifincludeTimestamp设置为true具有以下架构:
StructType([
StructField("value", StringType()),
StructField("timestamp", TimestampType())
])
Run Code Online (Sandbox Code Playgroud)text only ifincludeTimestamp设置为false使用架构,如下所示:
StructType([StructField("value", StringType())]))
Run Code Online (Sandbox Code Playgroud)如果要更改此格式,则必须转换流以提取感兴趣的字段,例如使用正则表达式:
from pyspark.sql.functions import regexp_extract
from functools import partial
fields = partial(
regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
)
lines.select(
fields(idx=1).alias("name"),
fields(idx=2).alias("last_name"),
fields(idx=3).alias("phone_number")
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1225 次 |
| 最近记录: |