set*_*127 4 apache-spark pyspark
我有一个看起来很简单的问题,但我一直用头撞墙,但没有成功。我基本上是在尝试做与这篇文章相同的事情,只是我不关心该文章的“分组依据”方面,我只想对所有行进行求和。
解释一下链接的帖子,DataFrame 看起来像:
ID,Vec
1,[0,0,5]
2,[3,3,4]
3,[0,8,1]
....
Run Code Online (Sandbox Code Playgroud)
我想按元素对向量求和。
上面示例的所需输出将是一行:
SumOfVectors
[3,11,10]
Run Code Online (Sandbox Code Playgroud)
另一个很大的区别是我使用的是 pyspark,而不是 Scala。我试着rdd.fold()开始工作,但要么工作不一样,要么我无法弄清楚 pyspark 中的语法。
最后一个警告是,我在 ~1MM 行的数据帧和长度 ~10k 的向量上执行此操作,因此这必须相当有效。
谢谢你的帮助!根据评论,下面是一个可重复的玩具数据框。
import numpy as np
from pyspark.ml.linalg import Vectors
n_rows = 100
pdf = np.concatenate([np.array(range(n_rows)), np.random.randn(n_rows), 3*np.random.randn(n_rows)+2, 6*np.random.randn(n_rows)-2]).reshape(n_rows,-1)
dff = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), pdf)
df = spark.createDataFrame(dff,schema=["ID", "Vec"])
Run Code Online (Sandbox Code Playgroud)
df.schema 应该看起来像 StructType(List(StructField(ID,LongType,true),StructField(Vec,VectorUDT,true)))
只是打印df给了我DataFrame[ID: bigint, Vec: vector]
同样重要的是,我在 Spark 2.4
$ spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_191
Branch HEAD
Compiled by user ec2-user on 2018-12-07T19:51:27Z
Revision bab859f34a291cb7b3f4e724b59e1b48af69016b
Url git@aws157git.com:/pkg/Aws157BigTop
Type --help for more information.
Run Code Online (Sandbox Code Playgroud)
我认为您必须先将向量列转换为数组,然后才能对其进行聚合。
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as F
from pyspark.sql import types as T
def vec2array(v):
v = Vectors.dense(v)
array = list([float(x) for x in v])
return array
vec2array_udf = F.udf(vec2array, T.ArrayType(T.FloatType()))
df = df.withColumn('Vec', vec2array_udf('Vec'))
n = len(df.select('Vec').first()[0])
bla = df.agg(F.array(*[F.sum(F.col("Vec")[i]) for i in range(n)]).alias("sum"))
bla.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
至少在我的版本 pyspark 3.0.0 中,您可以使用 Summarizer 轻松完成此操作。你的 col 需要是DenseVector类型
from pyspark.ml.stat import Summarizer
sdf.agg(Summarizer.mean(sdf.Vec)).show()
Run Code Online (Sandbox Code Playgroud)
我最终弄清楚了这一点(我在撒谎,我的一位同事为我弄清楚了)所以我将在这里发布答案,以防有人遇到同样的问题。
您可以使用fold类似于原始问题中链接的 scala 示例中的操作方式。pyspark 中的语法如下:
# find out how many Xs we're iterating over to establish the range below
vec_df = df.select('Vec')
num_cols = len(vec_df.first().Vec)
# iterate over vector to sum each "column"
vec_sums = vec_df.rdd.fold([0]*num_cols, lambda a,b: [x + y for x, y in zip(a, b)])
Run Code Online (Sandbox Code Playgroud)
简要说明:rdd.fold()有两个参数。第一个是初始化数组,在本例中[0]*num_cols它只是一个 0 的数组。第二个是应用于数组并用于迭代数据帧的每一行的函数。因此,对于每一行,它lambda a,b: [x + y for x, y in zip(a, b)]只是将这一行逐元素添加到迄今为止计算的内容中。
您可以使用我在原始问题中的代码来生成一个玩具数据框来测试它。希望这对某人有帮助。
| 归档时间: |
|
| 查看次数: |
3580 次 |
| 最近记录: |