相关疑难解决方法(0)

_pickle.PicklingError:无法序列化对象:TypeError:无法pickle _thread.RLock 对象

我想使用 Kafka 和 Spark 进行情感分析。我想要做的是从 Kafka 读取流数据,然后使用 Spark 对数据进行批处理。之后,我想使用我使用 Tensorflow 制作的函数 sensePredict() 分析批处理。这是我到目前为止所做的......

import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

#    Spark
from pyspark import SparkContext  
#    Spark Streaming
from pyspark.streaming import StreamingContext  
#    Kafka
from pyspark.streaming.kafka import KafkaUtils  
#    json parsing
import json

from multiprocessing import Lock
lock = Lock()

numDimensions = 300
maxSeqLength = 70
batchSize = 24
lstmUnits = 128
numClasses = 2
iterations = 100000

import numpy as np
import pickle
from nltk.tokenize import word_tokenize
import …
Run Code Online (Sandbox Code Playgroud)

python streaming apache-kafka apache-spark

8
推荐指数
1
解决办法
7243
查看次数

用于 PySpark 的 Pickling 猴子补丁 Keras 模型

我试图实现的总体目标是将 Keras 模型发送给每个 Spark 工作器,以便我可以在应用于 DataFrame 列的 UDF 中使用该模型。为此,Keras 模型需要是可腌制的。

似乎很多人通过猴子修补 Model 类来成功腌制 keras 模型,如下面的链接所示:

http://zachmoshe.com/2017/04/03/pickling-keras-models.html

但是,我还没有看到任何关于如何与 Spark 一起执行此操作的示例。我的第一次尝试只是make_keras_picklable()在驱动程序中运行该函数,这允许我在驱动程序中腌制和取消腌制模型,但我无法在 UDF 中腌制模型。

def make_keras_picklable():
    "Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
    ...

make_keras_picklable()

model = Sequential() # etc etc

def score(case):
    ....
    score = model.predict(case)
    ...

def scoreUDF = udf(score, ArrayType(FloatType()))
Run Code Online (Sandbox Code Playgroud)

我得到的错误表明在 UDF 中解压模型没有使用猴子修补的模型类。

AttributeError: 'Sequential' object has no attribute '_built'
Run Code Online (Sandbox Code Playgroud)

看起来另一个用户在这篇 SO 帖子中遇到了类似的错误,答案是“make_keras_picklable()也在每个工作人员上运行”。没有给出如何做到这一点的例子。

我的问题是:召集make_keras_picklable()所有工人的适当方式是什么?

我尝试使用broadcast()(见下文)但得到了与上面相同的错误。

def make_keras_picklable():
    "Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
    ...

make_keras_picklable()
spark.sparkContext.broadcast(make_keras_picklable())

model = Sequential() …
Run Code Online (Sandbox Code Playgroud)

monkeypatching pickle apache-spark pyspark keras

5
推荐指数
1
解决办法
1175
查看次数