我在 Python 中有一个 Azure Databricks 脚本,它使用结构化流从事件中心读取 JSON 消息,处理消息并将结果保存在数据湖存储中。消息从 Azure 逻辑应用发送到事件中心,该应用从 Twitter API 读取推文。
我正在尝试反序列化事件中心消息的正文,以便处理其内容。消息正文首先从二进制转换为字符串值,然后使用该from_json函数反序列化为结构类型,如本文所述:https : //databricks.com/blog/2017/02/23/working-complex-data-格式结构化流媒体apache-spark-2-1.html
这是一个代码示例(带有混淆的参数):
from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import DateType, StringType, StructType
EVENT_HUB_CONN_STRING = 'Endpoint=sb://myehnamespace.servicebus.windows.net/;SharedAccessKeyName=Listen;SharedAccessKey=xxx;EntityPath=myeh'
OUTPUT_DIR = '/mnt/DataLake/output'
CHECKPOINT_DIR = '/mnt/DataLake/checkpoint'
event_hub_conf = {
'eventhubs.connectionString' : EVENT_HUB_CONN_STRING
}
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load()
schema = StructType() \
.add('FetchTimestampUtc', DateType()) \
.add('Username', StringType()) \
.add('Name', StringType()) \
.add('TweetedBy', StringType()) \ …Run Code Online (Sandbox Code Playgroud) azure azure-eventhub pyspark databricks spark-structured-streaming