我想使用 Kafka 和 Spark 进行情感分析。我想要做的是从 Kafka 读取流数据,然后使用 Spark 对数据进行批处理。之后,我想使用我使用 Tensorflow 制作的函数 sensePredict() 分析批处理。这是我到目前为止所做的......
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
# Spark
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
from multiprocessing import Lock
lock = Lock()
numDimensions = 300
maxSeqLength = 70
batchSize = 24
lstmUnits = 128
numClasses = 2
iterations = 100000
import numpy as np
import pickle
from nltk.tokenize import word_tokenize
import …Run Code Online (Sandbox Code Playgroud) 我试图实现的总体目标是将 Keras 模型发送给每个 Spark 工作器,以便我可以在应用于 DataFrame 列的 UDF 中使用该模型。为此,Keras 模型需要是可腌制的。
似乎很多人通过猴子修补 Model 类来成功腌制 keras 模型,如下面的链接所示:
http://zachmoshe.com/2017/04/03/pickling-keras-models.html
但是,我还没有看到任何关于如何与 Spark 一起执行此操作的示例。我的第一次尝试只是make_keras_picklable()在驱动程序中运行该函数,这允许我在驱动程序中腌制和取消腌制模型,但我无法在 UDF 中腌制模型。
def make_keras_picklable():
"Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
...
make_keras_picklable()
model = Sequential() # etc etc
def score(case):
....
score = model.predict(case)
...
def scoreUDF = udf(score, ArrayType(FloatType()))
Run Code Online (Sandbox Code Playgroud)
我得到的错误表明在 UDF 中解压模型没有使用猴子修补的模型类。
AttributeError: 'Sequential' object has no attribute '_built'
Run Code Online (Sandbox Code Playgroud)
看起来另一个用户在这篇 SO 帖子中遇到了类似的错误,答案是“make_keras_picklable()也在每个工作人员上运行”。没有给出如何做到这一点的例子。
我的问题是:召集make_keras_picklable()所有工人的适当方式是什么?
我尝试使用broadcast()(见下文)但得到了与上面相同的错误。
def make_keras_picklable():
"Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
...
make_keras_picklable()
spark.sparkContext.broadcast(make_keras_picklable())
model = Sequential() …Run Code Online (Sandbox Code Playgroud)