bib*_*bib 5 python apache-spark word2vec pyspark fasttext
我想在pyspark应用程序中使用预训练嵌入模型(fasttext).
因此,如果我广播文件(.bin),则抛出以下异常:Traceback(最近一次调用last):
cPickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 2 GiB
Run Code Online (Sandbox Code Playgroud)
相反,我试图用sc.addFile(modelpath)其中modelpath=path/to/model.bin如下:
我创建了一个名为fasttextSpark.py的文件
import gensim
from gensim.models.fasttext import FastText as FT_gensim
# Load model (loads when this library is being imported)
model = FT_gensim.load_fasttext_format("/project/6008168/bib/wiki.en.bin")
# This is the function we use in UDF to predict the language of a given msg
def get_vector(msg):
pred = model[msg]
return pred
Run Code Online (Sandbox Code Playgroud)
和testSubmit.sh:
#!/bin/bash
#SBATCH -N 2
#SBATCH -t 00:10:00
#SBATCH --mem 20000
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 32
module load python/2.7.14
source "/project/6008168/bib/ENV2.7.14/bin/activate"
module load spark/2.3.0
spark-submit /project/6008168/bib/test.py
Run Code Online (Sandbox Code Playgroud)
和test.py:
from __future__ import print_function
import sys
import time
import math
import csv
import datetime
import StringIO
import pyspark
import gensim
from operator import add
from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from gensim.models.fasttext import FastText as FT_gensim
appName = "bib"
modelpath = "/project/6008168/bib/wiki.en.bin"
conf = (SparkConf()
.setAppName(appName)
.set("spark.executor.memory", "12G")
.set("spark.network.timeout", "800s")
.set("spark.executor.heartbeatInterval", "20s")
.set("spark.driver.maxResultSize", "12g")
.set("spark.executor.instances", 2)
.set("spark.executor.cores", 30)
)
sc = SparkContext(conf = conf)
#model = FT_gensim.load_fasttext_format(modelpath)
sc.addFile(modelpath)
sc.addPyFile("/project/6008168/bib/fasttextSpark.py")
# Import our custom fastText language classifier lib
import fasttextSpark
print ("nights = ", fasttextSpark.get_vector("nights"))
print ("done")
Run Code Online (Sandbox Code Playgroud)
现在,每个节点都将拥有预训练数据集的副本.有些单词不在词汇表中,所以每当我面对这样的单词时,我想为它创建一个随机但固定的向量,并将单词及其向量添加到字典中.
那么,我如何在每个节点中维护这样一个字典呢?
实际上,假设我的rdd跟随my_rdd =(id,sentence),我想通过总结其单词的向量来找到句子的嵌入向量.嵌入模型的加载次数.例如:
假设rdd=("id1", "motorcycle parts"),我的实施是否两次加载模型:一个用于摩托车,一个用于零件?如果是的话,我的方法是无效的?在这种情况下应该采用哪种最佳方法?
小智 1
Python 中的模块变量在模块加载时计算一次。因此,每个解释器都会加载该变量一次,并且只要解释器保持活动状态,该变量就保持活动状态。
然而 Spark 工作进程不共享内存,因此每个工作进程都会有一份字典副本。如果您有广播变量,情况也是如此。
因此,您当前的解决方案尽可能接近您想要的解决方案,而无需使用低级原语(如内存映射)或外部存储。
| 归档时间: |
|
| 查看次数: |
288 次 |
| 最近记录: |