Laz*_*zer 5 azure azure-eventhub pyspark databricks spark-structured-streaming
我在 Python 中有一个 Azure Databricks 脚本,它使用结构化流从事件中心读取 JSON 消息,处理消息并将结果保存在数据湖存储中。消息从 Azure 逻辑应用发送到事件中心,该应用从 Twitter API 读取推文。
我正在尝试反序列化事件中心消息的正文,以便处理其内容。消息正文首先从二进制转换为字符串值,然后使用该from_json函数反序列化为结构类型,如本文所述:https : //databricks.com/blog/2017/02/23/working-complex-data-格式结构化流媒体apache-spark-2-1.html
这是一个代码示例(带有混淆的参数):
from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import DateType, StringType, StructType
EVENT_HUB_CONN_STRING = 'Endpoint=sb://myehnamespace.servicebus.windows.net/;SharedAccessKeyName=Listen;SharedAccessKey=xxx;EntityPath=myeh'
OUTPUT_DIR = '/mnt/DataLake/output'
CHECKPOINT_DIR = '/mnt/DataLake/checkpoint'
event_hub_conf = {
'eventhubs.connectionString' : EVENT_HUB_CONN_STRING
}
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load()
schema = StructType() \
.add('FetchTimestampUtc', DateType()) \
.add('Username', StringType()) \
.add('Name', StringType()) \
.add('TweetedBy', StringType()) \
.add('Location', StringType()) \
.add('TweetText', StringType())
stream_data_body = stream_data \
.select(stream_data.body) \
.select(from_json('body', schema).alias('body')) \
.select(to_json('body').alias('body'))
# This works (bare string value, no deserialization):
# stream_data_body = stream_data.select(stream_data.body)
stream_data_body \
.writeStream \
.outputMode('append') \
.format('json') \
.option('path', OUTPUT_DIR) \
.option('checkpointLocation', CHECKPOINT_DIR) \
.start() \
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
在这里我实际上还没有做任何处理,只是一个简单的反序列化/序列化。
上述脚本确实向 Data Lake 生成输出,但结果 JSON 对象为空。下面是一个输出示例:
{}
{}
{}
Run Code Online (Sandbox Code Playgroud)
脚本中的注释代码确实会产生输出,但这只是字符串值,因为我们没有包括反序列化:
{"body":"{\"FetchTimestampUtc\": 2018-10-16T09:21:40.6173187Z, \"Username\": ... }}
Run Code Online (Sandbox Code Playgroud)
我想知道反斜杠是否应该加倍,如上面链接中给出的示例?这可能可以通过from_json函数的 options 参数实现:“控制解析的选项。接受与 json 数据源相同的选项。” 但我还没有找到选项格式的文档。
任何想法为什么反序列化/序列化不起作用?
看来输入 JSON 必须具有特定的语法。字段值必须是字符串,不允许使用时间戳(整数、浮点数等可能也是如此)。类型转换必须在 Databricks 脚本内完成。
\n\n我更改了输入 JSON,以便引用时间戳值。在架构中,我还更改DateType为TimestampType(这更合适),而不是StringType.
通过使用以下选择表达式:
\n\nstream_data_body = stream_data \\\n .select(from_json(stream_data.body.cast(\'string\'), schema).alias(\'body\')) \\\n .select(to_json(\'body\').alias(\'body\'))\nRun Code Online (Sandbox Code Playgroud)\n\n输出文件中产生以下输出:
\n\n{"body":"{\\"FetchTimestampUtc\\":\\"2018-11-29T21:26:40.039Z\\",\\"Username\\":\\"xyz\\",\\"Name\\":\\"x\\",\\"TweetedBy\\":\\"xyz\\",\\"Location\\":\\"\\",\\"TweetText\\":\\"RT @z123: I just want to say thanks to everyone who interacts with me, whether they talk or they just silently rt or like, thats okay.\xe2\x80\xa6\\"}"}\nRun Code Online (Sandbox Code Playgroud)\n\n尽管时间戳值作为字符串值输出,但这是预期的结果。事实上,整个 body 对象都是作为字符串输出的。
\n\n如果输入格式是具有本机字段类型的正确 JSON,我无法使摄取正常工作。在这种情况下, 的输出from_json始终为空。
编辑:\n这对我来说似乎很混乱。日期值应始终在 JSON 中引用,它们不是“本机”类型。
\n\n我已经测试过整数和浮点值可以不带引号传递,以便可以用它们进行计算。
\n| 归档时间: |
|
| 查看次数: |
1877 次 |
| 最近记录: |