KGS*_*KGS 6 python apache-spark pyspark
我在包含对称性的算法中看到的一个常见结构是
for (int i = 0; i < n ; i++) {
for (int j = i+1; j < n ; j++) {
[compute x]
objects[i][j] += x;
objects[j][i] -= x;
}
}
Run Code Online (Sandbox Code Playgroud)
这(虽然仍然具有 O(n^2) 复杂性)减少了利用对称性所需的计算量。您能告诉我在 pyspark 代码中引入这种优化的方法是什么吗?
例如,我编写了代码,根据公式(其中r是位置)计算作用在系统中每个粒子上的每单位质量的力:
N m_j*(r_i - r_j)
F = -G * ? -----------------
i!=j |r_i - r_j|^3
Run Code Online (Sandbox Code Playgroud)
在其中,我首先对我的数据帧与自身进行叉积以获得每个成对的相互作用,然后通过 id 将它们全部聚合以获得作用在每个粒子上的总力:
def calc_F(df_clust, G=1):
# cartesian product of the dataframe with itself
renameCols = [f"`{col}` as `{col}_other`" for col in df_clust.columns]
df_cart = df_clust.crossJoin(df_clust.selectExpr(renameCols))
df_clust_cartesian = df_cart.filter("id != id_other")
df_F_cartesian = df_clust_cartesian.selectExpr("id", "id_other", "m_other",
"`x` - `x_other` as `diff(x)`",
"`y` - `y_other` as `diff(y)`",
"`z` - `z_other` as `diff(z)`"
)
df_F_cartesian = df_F_cartesian.selectExpr("id", "id_other",
"`diff(x)` * `m_other` as `num(x)`",
"`diff(y)` * `m_other` as `num(y)`",
"`diff(z)` * `m_other` as `num(z)`",
"sqrt(`diff(x)` * `diff(x)` + `diff(y)`"
"* `diff(y)` + `diff(z)` * `diff(z)`) as `denom`",
)
df_F_cartesian = df_F_cartesian.selectExpr("id", "id_other",
"`num(x)` / pow(`denom`, 3) as `Fx`",
"`num(y)` / pow(`denom`, 3) as `Fy`",
"`num(z)` / pow(`denom`, 3) as `Fz`",
)
# squish back to inital particles
sumCols = ["Fx", "Fy", "Fz"]
df_agg = df_F_cartesian.groupBy("id").sum(*sumCols)
renameCols = [f"`sum({col})` as `{col}`" for col in sumCols]
df_F = df_agg.selectExpr("id", *renameCols)
df_F = df_F.selectExpr("id",
f"`Fx` * {-G} as Fx",
f"`Fy` * {-G} as Fy",
f"`Fz` * {-G} as Fz")
return df_F
Run Code Online (Sandbox Code Playgroud)
但我知道两个粒子之间的力是对称的—— F_ij = -F_ji(我假设所有质量都相等)——所以在这里我计算了两次力,而不是重复使用它们。所以在这种特殊情况下,我想df_clust_cartesian = df_cart.filter("id != id_other")变成df_clust_cartesian = df_cart.filter("id < id_other")例如,并在计算函数的第二部分中的总力时以某种方式重用这些力。(当然,理想情况下,我想学习一般的做法)
这种情况下的示例输入是
a = sc.parallelize([
[0.48593906,-0.52435857,-0.53198230,0.46153894,-0.33775792E-01,-0.32276499,0.15625001E-04,1],
[-0.65960690E-01,0.80844238E-01,-0.27603051,-0.57578009,1.1078150,-0.29340765,0.15625001E-04,2],
[-0.34809157E-01,0.76795481E-01,-0.39087987,-0.55399138,-0.17386098,0.59250806E-01,0.15625001E-04,3]
])
from pyspark.sql.types import *
clust_input = StructType([
StructField('x', DoubleType(), False),
StructField('y', DoubleType(), False),
StructField('z', DoubleType(), False),
StructField('vx', DoubleType(), False),
StructField('vy', DoubleType(), False),
StructField('vz', DoubleType(), False),
StructField('m', DoubleType(), False),
StructField('id', IntegerType(), False)
])
df_clust = a.toDF(schema=clust_input)
Run Code Online (Sandbox Code Playgroud)
基本上,您只想计算何时 的公式id < other_id,并使用该结果通过对称性生成 的所有元素id > other_id。
您只需要通过此修改您的过滤器
df_clust_cartesian = df_cart.filter("id < id_other")
Run Code Online (Sandbox Code Playgroud)
然后,一旦获得数据框,df_F_cartesian每对就有一行(id, id_other)。您可以使用该行生成与 和 对应的行,并向和(id_other, id)添加减号。Fx, FyFz
这可以通过在聚合之前添加以下步骤来完成:
from pyspark.sql import functions as F
sumCols = ["Fx", "Fy", "Fz"]
oppositeSums = [ (-F.col(c)).alias(c) for c in sumCols]
df_F_cartesian = df_F_cartesian.select(F.explode(F.array(
F.struct(F.col("id"), *sumCols),
F.struct(F.col("id_other").alias("id"), *oppositeSums)
)).alias("s")).select("s.*")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
221 次 |
| 最近记录: |