我发现这个有用的教程是关于使用低级BLAS函数(在Cython中实现)来获得比python中标准的numpy线性代数例程更快的速度.现在,我已成功地使矢量产品正常工作.首先,我保存以下内容linalg.pyx:
import cython
import numpy as np
cimport numpy as np
from libc.math cimport exp
from libc.string cimport memset
from scipy.linalg.blas import fblas
REAL = np.float64
ctypedef np.float64_t REAL_t
cdef extern from "/home/jlorince/flda/voidptr.h":
void* PyCObject_AsVoidPtr(object obj)
ctypedef double (*ddot_ptr) (const int *N, const double *X, const int *incX, const double *Y, const int *incY) nogil
cdef ddot_ptr ddot=<ddot_ptr>PyCObject_AsVoidPtr(fblas.ddot._cpointer) # vector-vector multiplication
cdef int ONE = 1
def vec_vec(syn0, syn1, size):
cdef int lSize = …Run Code Online (Sandbox Code Playgroud) 我有一个Python类,我用它来加载和处理Spark中的一些数据.在我需要做的各种事情中,我正在生成一个从Spark数据帧中的各个列派生的虚拟变量列表.我的问题是我不确定如何正确定义用户定义函数来完成我需要的东西.
我做目前有,当映射了潜在的数据帧RDD,解决了问题的一半(记住,这是在一个更大的方法等data_processor类):
def build_feature_arr(self,table):
# this dict has keys for all the columns for which I need dummy coding
categories = {'gender':['1','2'], ..}
# there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
if table == 'users':
iter_over = self.config.dyadic_features_to_include
elif table == 'activty':
iter_over = self.config.user_features_to_include
def _build_feature_arr(row):
result = []
row = row.asDict()
for …Run Code Online (Sandbox Code Playgroud) python apache-spark apache-spark-sql apache-spark-ml apache-spark-mllib
我已经通过Python API在spark中成功训练了LDA模型:
from pyspark.mllib.clustering import LDA
model=LDA.train(corpus,k=10)
Run Code Online (Sandbox Code Playgroud)
这完全正常,但我现在需要LDA模型的文档 -主题矩阵,但据我所知,我能得到的是使用的单词 -topic model.topicsMatrix().
有没有办法从LDA模型中获取文档主题矩阵,如果没有,是否有一种替代方法(除了从零开始实现LDA)以运行LDA模型,它将为我提供我需要的结果?
编辑:
经过深入挖掘后,我在Java api中找到了DistributedLDAModel的文档topicDistributions(),我认为这正是我所需要的(但我敢肯定,如果Pyspark中的LDAModel实际上是一个在引擎盖下的DistributedLDAModel ...).
无论如何,我可以像这样间接调用这个方法,没有任何明显的失败:
In [127]: model.call('topicDistributions')
Out[127]: MapPartitionsRDD[3156] at mapPartitions at PythonMLLibAPI.scala:1480
Run Code Online (Sandbox Code Playgroud)
但是如果我真的看结果,我得到的只是字符串告诉我结果实际上是一个Scala元组(我认为):
In [128]: model.call('topicDistributions').take(5)
Out[128]:
[{u'__class__': u'scala.Tuple2'},
{u'__class__': u'scala.Tuple2'},
{u'__class__': u'scala.Tuple2'},
{u'__class__': u'scala.Tuple2'},
{u'__class__': u'scala.Tuple2'}]
Run Code Online (Sandbox Code Playgroud)
也许这通常是正确的方法,但有没有办法获得实际结果?
简单的问题我还没有找到答案:
给定一个pandas系列,我认为 Series.unique()给出的值的顺序是它们在系列中首次遇到的顺序,而不是任何排序顺序.即
from pandas import Series
s = Series(['b','b','b','a','a','b'])
s.unique()
>>> array(['b', 'a'], dtype=object)
Run Code Online (Sandbox Code Playgroud)
这是我想要的应用程序的行为,但有人可以告诉我,如果我保证得到这个订单?文件不清楚.
Spark的StringIndexer非常有用,但是通常需要检索生成的索引值和原始字符串之间的对应关系,看起来应该有一种内置的方法来完成它.我将使用Spark文档中的这个简单示例来说明:
from pyspark.ml.feature import StringIndexer
df = sqlContext.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed_df = indexer.fit(df).transform(df)
Run Code Online (Sandbox Code Playgroud)
这个简化的案例给了我们:
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
| 0| a| 0.0|
| 1| b| 2.0|
| 2| c| 1.0|
| 3| a| 0.0|
| 4| a| 0.0|
| 5| c| 1.0|
+---+--------+-------------+
Run Code Online (Sandbox Code Playgroud)
所有精细和花花公子,但对于许多用例,我想知道我的原始字符串和索引标签之间的映射.我能想到的最简单的方法就是这样:
In [8]: indexed.select('category','categoryIndex').distinct().show()
+--------+-------------+
|category|categoryIndex|
+--------+-------------+
| b| 2.0|
| c| 1.0|
| a| 0.0|
+--------+-------------+ …Run Code Online (Sandbox Code Playgroud) python apache-spark apache-spark-sql pyspark apache-spark-ml
我在Spark中将模型实现为python类,每当我尝试将类方法映射到RDD时,它都会失败.我的实际代码更复杂,但这个简化版本是问题的核心:
class model(object):
def __init__(self):
self.data = sc.textFile('path/to/data.csv')
# other misc setup
def run_model(self):
self.data = self.data.map(self.transformation_function)
def transformation_function(self,row):
row = row.split(',')
return row[0]+row[1]
Run Code Online (Sandbox Code Playgroud)
现在,如果我像这样运行模型(例如):
test = model()
test.run_model()
test.data.take(10)
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
例外:您似乎尝试从广播变量,操作或转换引用SparkContext.SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063.
我已经玩了一下这个,当我尝试将类方法映射到类中的RDD时,它似乎可靠地发生.我已经确认,如果我在类结构之外实现,映射函数可以正常工作,所以问题肯定与类有关.有办法解决这个问题吗?
我在约100万个项目上运行k-means聚类(每个项目表示为~100个特征向量).我已经为各种k运行了聚类,现在想要使用sklearn中实现的轮廓分数来评估不同的结果.试图在没有采样的情况下运行它似乎不可行并且需要花费相当长的时间,因此我假设我需要使用采样,即:
metrics.silhouette_score(feature_matrix, cluster_labels, metric='euclidean',sample_size=???)
Run Code Online (Sandbox Code Playgroud)
然而,我并不清楚适当的采样方法是什么.在给定矩阵大小的情况下,对于使用什么尺寸的样本,是否有经验法则?采用我的分析机可以处理的最大样本,或者采用更小样本的平均值更好吗?
我在很大程度上要求,因为我的初步测试(使用sample_size = 10000)产生了一些非常不直观的结果.
我也愿意采用其他更具可扩展性的评估指标.
编辑以显示问题:对于不同的样本大小,该图显示了作为聚类数量函数的轮廓得分 
有点奇怪的是,增加样本量似乎可以减少噪音.奇怪的是,鉴于我有100万个非常异质的向量,2或3是"最佳"聚类数.换句话说,当我增加簇的数量时,我会发现轮廓得分或多或少单调减少,这是不直观的.
我想上存储为在此描述的火花RowMatrix矩阵运行一些基本线性代数运算(具体地转置,点积,和倒数)这里(使用Python API)。按照文档中的示例(就我而言,矩阵中将有更多行,因此需要 Spark),假设我有这样的内容:
from pyspark.mllib.linalg.distributed import RowMatrix
# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)
Run Code Online (Sandbox Code Playgroud)
给定这样一个分布式矩阵,是否有用于进行矩阵转置和点积的现有例程,例如:
dot(mat.T,mat)
Run Code Online (Sandbox Code Playgroud)
或矩阵逆?
inverse(mat)
Run Code Online (Sandbox Code Playgroud)
我似乎无法在文档中找到关于此的任何内容。寻找 (a) 指向相关文档的指针或 (b) 自己实现的方法。
我一直在遇到一些Spark LDA主题建模中的各种问题(主要是看似随机间隔的解除错误)我一直在运行,我认为这主要与我的执行器上的内存分配不足有关.这似乎与有问题的自动群集配置有关.我的最新尝试使用n1-standard-8机器(8核,30GB RAM)用于主节点和工作节点(6个工作站,因此总共48个核心).
但是当我看到时,/etc/spark/conf/spark-defaults.conf我看到了这个:
spark.master yarn-client
spark.eventLog.enabled true
spark.eventLog.dir hdfs://cluster-3-m/user/spark/eventlog
# Dynamic allocation on YARN
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 1
spark.dynamicAllocation.initialExecutors 100000
spark.dynamicAllocation.maxExecutors 100000
spark.shuffle.service.enabled true
spark.scheduler.minRegisteredResourcesRatio 0.0
spark.yarn.historyServer.address cluster-3-m:18080
spark.history.fs.logDirectory hdfs://cluster-3-m/user/spark/eventlog
spark.executor.cores 4
spark.executor.memory 9310m
spark.yarn.executor.memoryOverhead 930
# Overkill
spark.yarn.am.memory 9310m
spark.yarn.am.memoryOverhead 930
spark.driver.memory 7556m
spark.driver.maxResultSize 3778m
spark.akka.frameSize 512
# Add ALPN for Bigtable
spark.driver.extraJavaOptions -Xbootclasspath/p:/usr/local/share/google/alpn/alpn-boot-8.1.3.v20150130.jar
spark.executor.extraJavaOptions -Xbootclasspath/p:/usr/local/share/google/alpn/alpn-boot-8.1.3.v20150130.jar
Run Code Online (Sandbox Code Playgroud)
但这些价值观没有多大意义.为什么只使用4/8执行器核心?并且只有9.3/30GB RAM?我的印象是所有这些配置都应该自动处理,但即使是我手动调整的尝试也没有让我到处都是.
例如,我尝试使用以下命令启动shell:
spark-shell --conf spark.executor.cores=8 --conf spark.executor.memory=24g
Run Code Online (Sandbox Code Playgroud)
但后来失败了
java.lang.IllegalArgumentException: Required executor memory (24576+930 MB) is above the max threshold …Run Code Online (Sandbox Code Playgroud) lda apache-spark google-cloud-platform google-cloud-dataproc
据推测,t-SNE可以扩展到数百万次观察(见这里),但我很好奇这是怎么回事,至少在Sklearn实现中如此.
我正在尝试一个包含~100k项目的数据集,每个项目都有~190个特征.现在,我知道我可以用例如PCA进行降维的第一步,但问题似乎更为根本.
t-SNE计算并存储为输入观测值计算的完整,密集的相似性矩阵(我已经通过查看源证实了这一点).就我而言,这是一个100 亿元素的密集矩阵,它本身需要80 GB +的内存.将此推断为仅一百万个观测值,并且您正在查看8 TB的RAM以存储距离矩阵(更不用说计算时间......)
那么,我们怎样才能在sklearn实现中将t-SNE扩展到数百万个数据点?我错过了什么吗?该sklearn文档至少意味着它可能:
默认情况下,梯度计算算法使用在O(NlogN)时间内运行的Barnes-Hut近似.method ='exact'将在O(N ^ 2)时间内以较慢但精确的算法运行.当最近邻误差需要优于3%时,应使用精确算法.但是,确切的方法无法扩展到数百万个示例.
这是我的重点,但我肯定会读到,因为暗示Barnes-hut方法可以扩展到数百万个例子,但我会重申,代码需要在我们得到任何实际t-之前很好地计算全距离矩阵.冷酷变换(有或没有Barnes-hut).
我错过了什么吗?是否有可能将其扩展到数百万个数据点?
python ×9
apache-spark ×6
pyspark ×3
lda ×2
scikit-learn ×2
blas ×1
cython ×1
matrix ×1
numpy ×1
pandas ×1