Kit*_*ito 10 python scikit-learn apache-spark spark-streaming
我正在使用两个不同的窗口运行Spark Streaming(在窗口上用SKLearn训练模型,另一个用于根据该模型预测值)我想知道如何避免一个窗口("慢"训练窗口)到训练模型,而不"阻塞""快速"预测窗口.
我的简化代码如下:
conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
stream = ssc.socketTextStream("localhost", 7000)
import Custom_ModelContainer
### Window 1 ###
### predict data based on model computed in window 2 ###
def predict(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
# regular python code
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
pred = Custom_ModelContainer.getmodel().predict(X)
# send prediction to GUI
except Exception, e: print e
predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)
### Window 2 ###
### fit new model ###
def trainModel(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
y = np.array(df.map(lambda lp: lp.label).collect())
# train test split etc...
model = SVR().fit(X_train, y_train)
Custom_ModelContainer.setModel(model)
except Exception, e: print e
modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)
Run Code Online (Sandbox Code Playgroud)
(注意:Custom_ModelContainer是我编写的用于保存和检索训练模型的类)
我的设置通常工作正常,但每次在第二个窗口中训练新模型(大约需要一分钟)时,第一个窗口在模型训练完成之前不会计算预测.实际上,我认为这是有道理的,因为模型拟合和预测都是在主节点上计算的(在非分布式设置中 - 由于SKLearn).
所以我的问题如下:是否可以在单个工作节点(而不是主节点)上训练模型?如果是这样,我怎么能实现后者并且实际上会解决我的问题?
如果没有,关于如何在不延迟窗口1中的计算的情况下进行此类设置的任何其他建议?
任何帮助是极大的赞赏.
编辑:我想更普遍的问题是:如何对两个不同的工作人员并行运行两个不同的任务?
免责声明:这只是一组想法。这些都没有经过实践检验。
您可以尝试以下几件事:
不要。collect 模型通常是可序列化的,因此预测过程可以在集群上轻松处理:predictscikit-learn
def predict(time, rdd):
...
model = Custom_ModelContainer.getmodel()
pred = (df.rdd.map(lambda lp: lp.features.toArray())
.mapPartitions(lambda iter: model.predict(np.array(list(iter)))))
...
Run Code Online (Sandbox Code Playgroud)
它不仅应该并行化预测,而且如果原始数据不传递到 GUI,还应该减少必须收集的数据量。
尝试collect异步发送数据。PySpark 不提供collectAsync方法,但您可以尝试实现类似的功能concurrent.futures:
from pyspark.rdd import RDD
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def submit_to_gui(*args): ...
def submit_if_success(f):
if not f.exception():
executor.submit(submit_to_gui, f.result())
Run Code Online (Sandbox Code Playgroud)
从 1 继续。
def predict(time, rdd):
...
f = executor.submit(RDD.collect, pred)
f.add_done_callback(submit_if_success)
...
Run Code Online (Sandbox Code Playgroud)如果您确实想使用本地scikit-learn模型,请尝试collect使用fit上述期货。您还可以尝试仅收集一次,特别是在未缓存数据的情况下:
def collect_and_train(df):
y, X = zip(*((p.label, p.features.toArray()) for p in df.collect()))
...
return SVR().fit(X_train, y_train)
def set_if_success(f):
if not f.exception():
Custom_ModelContainer.setModel(f.result())
def trainModel(time, rdd):
...
f = excutor.submit(collect_and_train, df)
f.add_done_callback(set_if_success)
...
Run Code Online (Sandbox Code Playgroud)使用现有解决方案(例如或自定义方法)将训练过程移至集群spark-sklearn:
coalesce(1)并使用mapPartitions.mapPartitions,收集模型并作为整体使用,例如通过采用平均值或中值预测。丢弃scikit-learn并使用可以在分布式流环境中训练和维护的模型(例如StreamingLinearRegressionWithSGD)。
您当前的方法使 Spark 过时了。如果您可以在本地训练模型,那么您很有可能可以在本地计算机上更快地执行所有其他任务。否则你的程序将在 上失败collect。
| 归档时间: |
|
| 查看次数: |
427 次 |
| 最近记录: |