在pyspark UDF中使用tensorflow.keras模型会产生pickle错误

You*_*oui 4 user-defined-functions apache-spark pyspark keras tensorflow

我想在 pysark pandas_udf 中使用 tensorflow.keras 模型。但是,在将模型发送给工作人员之前对其进行序列化时,我遇到了 pickle 错误。我不确定我是否使用最好的方法来执行我想要的操作,因此我将公开一个最小但完整的示例。

套餐:

  • tensorflow-2.2.0(但所有以前的版本也会触发错误)
  • pyspark-2.4.5

进口声明是:

import pandas as pd
import numpy as np

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

from pyspark.sql import SparkSession, functions as F, types as T
Run Code Online (Sandbox Code Playgroud)

Pyspark UDF 是 pandas_udf:

def compute_output_pandas_udf(model):
    '''Spark pandas udf for model prediction.'''

    @F.pandas_udf(T.DoubleType(), F.PandasUDFType.SCALAR)
    def compute_output(inputs1, inputs2, inputs3):
        pdf = pd.DataFrame({
            'input1': inputs1,
            'input2': inputs2,
            'input3': inputs3
        })
        pdf['predicted_output'] = model.predict(pdf.values)
        return pdf['predicted_output']

    return compute_output
Run Code Online (Sandbox Code Playgroud)

主要代码:

# Model parameters
weights = np.array([[0.5], [0.4], [0.3]])
bias = np.array([1.25])
activation = 'linear'
input_dim, output_dim = weights.shape

# Initialize model
model = Sequential()
layer = Dense(output_dim, input_dim=input_dim, activation=activation)
model.add(layer)
layer.set_weights([weights, bias])

# Initialize Spark session
spark = SparkSession.builder.appName('test').getOrCreate()

# Create pandas df with inputs and run model
pdf = pd.DataFrame({
    'input1': np.random.randn(200),
    'input2': np.random.randn(200),
    'input3': np.random.randn(200)
})
pdf['predicted_output'] = model.predict(pdf[['input1', 'input2', 'input3']].values)

# Create spark df with inputs and run model using udf
sdf = spark.createDataFrame(pdf)
sdf = sdf.withColumn('predicted_output', compute_output_pandas_udf(model)('input1', 'input2', 'input3'))
sdf.limit(5).show()
Run Code Online (Sandbox Code Playgroud)

当调用compute_output_pandas_udf(model)时会触发此错误:

PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
Run Code Online (Sandbox Code Playgroud)

我找到了关于pickle keras模型的页面,并在tensorflow.keras上进行了尝试,但是当在UDF中调用模型的预测函数时,出现以下错误(因此序列化有效,但反序列化无效?):

AttributeError: 'Sequential' object has no attribute '_distribution_strategy'
Run Code Online (Sandbox Code Playgroud)

有人知道如何继续吗?先感谢您!

PS:请注意,我没有直接使用 keras 库中的模型,因为我定期出现另一个错误,并且解决它似乎更困难。但是,模型的序列化不会像 tensorflow.keras 模型那样生成错误。

You*_*oui 7

因此,看起来如果我们使用该解决方案直接在tensorflow.keras.models.Model类中扩展getstatesetstate方法,如http://zachmoshe.com/2017/04/03/pickling-keras-models中所示。 html,那么工作人员无法反序列化模型,因为他们没有该类的扩展。

然后,解决方案是使用Erp12在这篇文章中建议的包装类。

class ModelWrapperPickable:

    def __init__(self, model):
        self.model = model

    def __getstate__(self):
        model_str = ''
        with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
            tensorflow.keras.models.save_model(self.model, fd.name, overwrite=True)
            model_str = fd.read()
        d = { 'model_str': model_str }
        return d

    def __setstate__(self, state):
        with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
            fd.write(state['model_str'])
            fd.flush()
            self.model = tensorflow.keras.models.load_model(fd.name)
Run Code Online (Sandbox Code Playgroud)

UDF 变为:

def compute_output_pandas_udf(model_wrapper):
    '''Spark pandas udf for model prediction.'''

    @F.pandas_udf(T.DoubleType(), F.PandasUDFType.SCALAR)
    def compute_output(inputs1, inputs2, inputs3):
        pdf = pd.DataFrame({
            'input1': inputs1,
            'input2': inputs2,
            'input3': inputs3
        })
        pdf['predicted_output'] = model_wrapper.model.predict(pdf.values)
        return pdf['predicted_output']

    return compute_output
Run Code Online (Sandbox Code Playgroud)

以及主要代码:

# Model parameters
weights = np.array([[0.5], [0.4], [0.3]])
bias = np.array([1.25])
activation = 'linear'
input_dim, output_dim = weights.shape

# Initialize keras model
model = Sequential()
layer = Dense(output_dim, input_dim=input_dim, activation=activation)
model.add(layer)
layer.set_weights([weights, bias])
# Initialize model wrapper
model_wrapper= ModelWrapperPickable(model)

# Initialize Spark session
spark = SparkSession.builder.appName('test').getOrCreate()

# Create pandas df with inputs and run model
pdf = pd.DataFrame({
    'input1': np.random.randn(200),
    'input2': np.random.randn(200),
    'input3': np.random.randn(200)
})
pdf['predicted_output'] = model_wrapper.model.predict(pdf[['input1', 'input2', 'input3']].values)

# Create spark df with inputs and run model using udf
sdf = spark.createDataFrame(pdf)
sdf = sdf.withColumn('predicted_output', compute_output_pandas_udf(model_wrapper)('input1', 'input2', 'input3'))
sdf.limit(5).show()
Run Code Online (Sandbox Code Playgroud)