小编whs*_*s2k的帖子

如何将Spark数据帧推送到弹性搜索(Pyspark)

初学ES问题在这里

将Spark Dataframe推送到弹性搜索的工作流程或步骤是什么?

从研究来看,我相信我需要使用spark.newAPIHadoopFile()方法.

但是,通过挖掘弹性搜索文档其他堆栈Q/A,我仍然对参数需要采用何种格式以及为什么有点困惑

请注意,我使用的是pyspark,这是ES的新表(没有索引已存在),df是5列(2个字符串类型,2个长类型和1个整数列表),行数约为3.5M.

python elasticsearch apache-spark-sql pyspark spark-dataframe

6
推荐指数
2
解决办法
3497
查看次数

Pyspark:计算向量列的余弦相似度的最快方法是什么

Pyspark 初学者问题在这里!我有一个大约 2M 行已矢量化文本的数据框(通过 w2v;300 维)。计算每行相对于新的单向量输入的余弦距离的最有效方法是什么?

我当前的方法使用 udf 并需要几分钟,对于我想要创建的 web 应用程序来说太长了。

创建样本 df:

import numpy as np
import pandas as pd
from pyspark.sql.functions import *

column=[]
num_rows = 10000 #change to 2000000 to really slow your computer down!
for x in range(num_rows):
    sample = np.random.uniform(low=-1, high=1, size=(300,)).tolist()
    column.append(sample)
index = range(1000)
df_pd = pd.DataFrame([index, column]).T
#df_pd = pd.concat([df.T[x] for x in df.T], ignore_index=True)
df_pd.head()
df = spark.createDataFrame(df_pd).withColumnRenamed('0', 'Index').withColumnRenamed('1', 'Vectors')
df.show()
Run Code Online (Sandbox Code Playgroud)

创建一个示例输入(我将其创建为 Spark df,以便通过现有管道进行转换):

new_input = np.random.uniform(low=-1, high=1, size=(300,)).tolist()
df_pd_new …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

6
推荐指数
1
解决办法
5920
查看次数

Pyspark如何从word2vec单词嵌入中计算Doc2Vec?

我有一个pyspark数据框,其中包含大约300k个唯一行的语料库,每个行都有一个"doc",每个文档包含几个文本句子.

在处理之后,我有每行/ doc的200维矢量化表示.我的NLP流程:

  1. 用正则表达式udf删除标点符号
  2. 用nltk雪球udf词干
  3. Pyspark Tokenizer
  4. Word2Vec(ml.feature.Word2Vec,vectorSize = 200,windowSize = 5)

我理解这个实现如何使用skipgram模型根据使用的完整语料库为每个单词创建嵌入.我的问题是:这个实现如何从语料库中每个单词的向量转到每个文档/行的向量?

它与gensim doc2vec实现中的过程相同,它只是简单地将每个文档中的单词向量连接在一起吗?:gensim如何计算doc2vec段落向量.如果是这样,它如何将向量切割到指定大小200(它只使用前200个单词?平均值?)?

我无法从源代码中找到信息:https://spark.apache.org/docs/2.2.0/api/python/_modules/pyspark/ml/feature.html#Word2Vec

任何帮助或参考材料,超级赞赏!

nlp apache-spark word2vec pyspark doc2vec

5
推荐指数
1
解决办法
2456
查看次数

如何查找多个向量均为零的索引

初学者 pySpark 问题在这里:

如何找到所有向量都为零的索引?

经过一系列转换后,我有一个大约 2.5M 行的 Spark df 和一个长度大约为 262K 的 tfidf 稀疏向量。我想执行 PCA 降维,以使这些数据对于多层感知器模型拟合来说更易于管理,但 pyspark 的 PCA 仅限于最多 65,535 列。

+--------------------+
|      tfidf_features| df.count() >>> 2.5M 
+--------------------+ Example Vector:
|(262144,[1,37,75,...| SparseVector(262144, {7858: 1.7047, 12326: 1.2993, 15207: 0.0953, 
|(262144,[0],[0.12...|      24112: 0.452, 40184: 1.7047,...255115: 1.2993, 255507: 1.2993})
|(262144,[0],[0.12...|
|(262144,[0],[0.12...|
|(262144,[0,6,22,3...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)

因此,我想删除稀疏 tfidf 向量的索引或列,这些索引或列对于所有约 2.5M 文档(行)均为零。这有望使我的 PCA 最大值低于 65,535。

我的计划是创建一个 udf,(1)将稀疏向量转换为密集向量(或 np 数组)(2)搜索所有向量以查找所有向量均为零的索引(3)删除索引。然而,我正在努力解决第二部分(找到所有向量为零的索引)。这是我到目前为止的情况,但我认为我的攻击计划太耗时而且不太Pythonic(特别是对于这么大的数据集):

import numpy as np    
row_count = df.count()
def find_zero_indicies(df):
     vectors = df.select('tfidf_features').take(row_count)[0]
     zero_indices = [] …
Run Code Online (Sandbox Code Playgroud)

python numpy sparse-matrix apache-spark pyspark

4
推荐指数
1
解决办法
1061
查看次数