PySpark 和 Protobuf 反序列化 UDF 问题

Mar*_*lis 8 python pyspark databricks azure-databricks protobuf-python

我收到这个错误

Can't pickle <class 'google.protobuf.pyext._message.CMessage'>: it's not found as google.protobuf.pyext._message.CMessage

当我尝试在 PySpark 中创建 UDF 时。显然,它使用 CloudPickle 来序列化命令,但是,我知道 protobuf 消息包含C++实现,这意味着它不能被腌制。

我试图找到一种方法来覆盖CloudPickleSerializer,但是,我找不到方法。

这是我的示例代码:

from MyProject.Proto import MyProtoMessage
from google.protobuf.json_format import MessageToJson
import pyspark.sql.functions as F

def proto_deserialize(body):
  msg = MyProtoMessage()
  msg.ParseFromString(body)
  return MessageToJson(msg)

from_proto = F.udf(lambda s: proto_deserialize(s))

base.withColumn("content", from_proto(F.col("body")))
Run Code Online (Sandbox Code Playgroud)

提前致谢。

H. *_*hez 0

从版本 3.4.0 开始,pyspark 包含一种反序列化 protobuf 二进制消息的机制,您可以在其中指定消息描述符文件的路径。在 3.5.0 中添加了允许传递描述符二进制文件的参数。这样您就可以避免 UDF 开销和错误:)

from pyspark.sql.protobuf.functions import from_protobuf
spark = ...
descriptor_file_path = ...

# Assume we have a protobuf message called MyMessage and that
# the dataframe has the binary data of the messages in a column
# called `data`
df = spark.createDataFrame(...)
df.withColumn(
    'deserialized',
    from_frotobuf(
        data='data',
        messageName='MyMessage',
        descFilePath=descriptor_file_path,
        # binaryDescriptorSet=... # If you rather provide the binary descriptor
    )
)
Run Code Online (Sandbox Code Playgroud)

有关更多信息,请参阅此处的文档。