Tgs*_*591 10 python performance machine-learning pyspark spark-dataframe
我知道有一千个问题涉及如何通过盐键等来最好地划分你的DataFrames或者RDDs,但我认为这种情况不同,足以保证自己的问题.
我正在PySpark中构建协同过滤推荐引擎,这意味着需要比较每个用户(行)的唯一项目评级.因此,对于一DataFrame维的M (rows) x N (columns),这意味着该数据集变得M x (K choose 2)其中K << N为非空为用户(即,评估)元素的数量.
我的算法对于用户评估了大约一定数量项目的数据集非常有效.但是,对于一部分用户评估了大量项目(数量级比同一分区中的其他用户大)的情况,我的数据变得非常偏斜,最后几个分区开始花费大量时间.举一个简单的例子,考虑以下内容DataFrame:
cols = ['id', 'Toy Story', 'UP', 'Die Hard', 'MIB', 'The Shining']
ratings = [
(1, 4.5, 3.5, None, 1.0, None), # user 1
(2, 2.0, None, 5.0, 4.0, 3.0), # user 2
(3, 3.5, 5.0, 1.0, None, 1.0), # user 3
(4, None, None, 4.5, 3.5, 4.0), # user 4
(5, None, None, None, None, 4.5) # user 5
]
sc.parallelize(ratings, 2).toDF(cols)
Run Code Online (Sandbox Code Playgroud)
我的情况呈现出更大的变化DataFrame(约1,000,000个用户和~10k项目),其中一些用户对电影的评分比其他用户大得多.最初,我将我的sparsify DataFrame如下:
def _make_ratings(row):
import numpy as np
non_null_mask = ~np.isnan(row)
idcs = np.where(non_null_mask)[0] # extract the non-null index mask
# zip the non-null idcs with the corresponding ratings
rtgs = row[non_null_mask]
return list(zip(idcs, rtgs))
def as_array(partition):
import numpy as np
for row in partition:
yield _make_ratings(np.asarray(row, dtype=np.float32))
# drop the id column, get the RDD, and make the copy of np.ndarrays
ratings = R.drop('id').rdd\
.mapPartitions(as_array)\
.cache()
Run Code Online (Sandbox Code Playgroud)
然后,我可以通过以下方式检查每个分区所需的相互评级对的数量:
n_choose_2 = (lambda itrbl: (len(itrbl) * (len(itrbl) - 1)) / 2.)
sorted(ratings.map(n_choose_2).glom().map(sum).collect(), reverse=True)
Run Code Online (Sandbox Code Playgroud)
最初,这是我得到的每个分区的相互评级对的分布:
如您所见,这只是不可扩展.所以我第一次尝试解决这个问题就是在源头更智能地划分数据帧.我想出了以下函数,它将随机分割我的数据框行:
def shuffle_partition(X, n_partitions, col_name='shuffle'):
from pyspark.sql.functions import rand
X2 = X.withColumn(col_name, rand())
return X2.repartition(n_partitions, col_name).drop(col_name)
Run Code Online (Sandbox Code Playgroud)
这很有效.应用之后,这是新的发行版:
这肯定会更好,但仍然不是我喜欢的.有必须是更均匀地跨分区分发这些"权力评价者"的方式,但我就是想不通.我一直在考虑按"每个用户的评级数"列进行分区,但这最终会将所有高评级用户集中在一起,而不是将它们分开.
我错过了一些明显的东西吗
我在以下函数中实现了igrinis的解决方案(我确信有一个更优雅的方式来编写它,但我不是非常熟悉DataFrameAPI,所以我回到RDD这个 - 批评欢迎),但分发是和原版大致相同,所以不确定我是否做错了......:
def partition_by_rating_density(X, id_col_name, n_partitions,
partition_col_name='partition'):
"""Segment partitions by rating density. Partitions will be more
evenly distributed based on the number of ratings for each user.
Parameters
----------
X : PySpark DataFrame
The ratings matrix
id_col_name : str
The ID column name
n_partitions : int
The number of partitions in the new DataFrame.
partition_col_name : str
The name of the partitioning column
Returns
-------
with_partition_key : PySpark DataFrame
The partitioned DataFrame
"""
ididx = X.columns.index(id_col_name)
def count_non_null(row):
sm = sum(1 if v is not None else 0
for i, v in enumerate(row) if i != ididx)
return row[ididx], sm
# add the count as the last element and id as the first
counted = X.rdd.map(count_non_null)\
.sortBy(lambda r: r[-1], ascending=False)
# get the count array out, zip it with the index, and then flatMap
# it out to get the sorted index
indexed = counted.zipWithIndex()\
.map(lambda ti: (ti[0][0], ti[1] % n_partitions))\
.toDF([id_col_name, partition_col_name])
# join back with indexed, which now has the partition column
counted_indexed = X.join(indexed, on=id_col_name, how='inner')
# the columns to drop
return counted_indexed.repartition(n_partitions, partition_col_name)\
.drop(partition_col_name)
Run Code Online (Sandbox Code Playgroud)
您可以做的是按照评级数获取用户的排序列表,然后将列中的索引除以分区数.将除法的剩余部分作为列,然后partitionBy()在该列上重新分区.这样,您的分区将具有几乎相同的所有用户评级计数表示.
对于3个分区,这将为您提供:
[1000, 800, 700, 600, 200, 30, 10, 5] - number of ratings
[ 0, 1, 2, 3, 4, 5, 6, 7] - position in sorted index
[ 0, 1, 2, 0, 1, 2, 0, 1] - group to partition by
Run Code Online (Sandbox Code Playgroud)