Not*_*tMe 5 python-3.x apache-spark pyspark spark-structured-streaming
问题:如何将 JSON 字符串转换为 DataFrame 并仅选择我想要的键?
我上周刚刚开始使用 Spark,我仍在学习中,所以请耐心等待。
我正在使用 Spark(2.4) 结构化流。Spark 应用程序从 Twitter 流中获取数据(通过套接字),发送的数据是完整的 Twitter JSON 字符串。下面是其中一个数据帧。每一行都是完整的 JSON 推文。
+--------------------+
| value|
+--------------------+
|{"created_at":"Tu...|
|{"created_at":"Tu...|
|{"created_at":"Tu...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)
正如 Venkata 所建议的,我这样做了,翻译成 python(完整代码如下)
schema = StructType().add('created_at', StringType(), False).add('id_str', StringType(), False)
df = lines.selectExpr('CAST(value AS STRING)').select(from_json('value', schema).alias('temp')).select('temp.*')
Run Code Online (Sandbox Code Playgroud)
这是返回值
+------------------------------+-------------------+
|created_at |id_str |
+------------------------------+-------------------+
|Wed Feb 20 04:51:18 +0000 2019|1098082646511443968|
|Wed Feb 20 04:51:18 +0000 2019|1098082646285082630|
|Wed Feb 20 04:51:18 +0000 2019|1098082646444441600|
|Wed Feb 20 04:51:18 +0000 2019|1098082646557642752|
|Wed Feb 20 04:51:18 +0000 2019|1098082646494797824|
|Wed Feb 20 04:51:19 +0000 2019|1098082646817681408|
+------------------------------+-------------------+
Run Code Online (Sandbox Code Playgroud)
可以看到,DataFrame 中只包含了我想要的 2 个键。
希望这对任何新手都有帮助。
完整代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
sc = spark.sparkContext
lines = spark.readStream.format('socket').option('host', '127.0.0.1').option('port', 9999).load()
schema = StructType().add('created_at', StringType(), False).add('id_str', StringType(), False)
df = lines.selectExpr('CAST(value AS STRING)').select(from_json('value', schema).alias('temp')).select('temp.*')
query = df.writeStream.format('console').option('truncate', 'False').start()
# this part is only used to print out the query when running as an app. Not needed if using jupyter
import time
time.sleep(10)
lines.stop()
Run Code Online (Sandbox Code Playgroud)
下面是一个示例代码片段,您可以使用它从 json 转换为 DataFrame。
val schema = new StructType().add("id", StringType).add("pin",StringType)
val dataFrame= data.
selectExpr("CAST(value AS STRING)").as[String].
select(from_json($"value",schema).
alias("tmp")).
select("tmp.*")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3236 次 |
| 最近记录: |