相关疑难解决方法(0)

在PySpark中的GroupedData上应用UDF(具有正常运行的python示例)

我有这个在pandas数据帧中本地运行的python代码:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))
Run Code Online (Sandbox Code Playgroud)

我想在PySpark中运行它,但在处理pyspark.sql.group.GroupedData对象时遇到问题.

我尝试过以下方法:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 
Run Code Online (Sandbox Code Playgroud)

返回

KeyError: 'A'
Run Code Online (Sandbox Code Playgroud)

我推测因为'A'不再是一列而我找不到x.name的等价物.

然后

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()
Run Code Online (Sandbox Code Playgroud)

但是得到以下错误:

AttributeError: 'GroupedData' object has no attribute 'map'
Run Code Online (Sandbox Code Playgroud)

任何建议将非常感谢!

python user-defined-functions apache-spark apache-spark-sql pyspark

27
推荐指数
4
解决办法
2万
查看次数

如何定义自定义聚合函数来对一列向量求和?

我有一个两列的DataFrame,ID类型IntVec类型Vector(org.apache.spark.mllib.linalg.Vector).

DataFrame如下所示:

ID,Vec
1,[0,0,5]
1,[4,0,1]
1,[1,2,1]
2,[7,5,0]
2,[3,3,4]
3,[0,8,1]
3,[0,0,1]
3,[7,7,7]
....
Run Code Online (Sandbox Code Playgroud)

我想groupBy($"ID")通过对向量求和来对每个组内的行应用聚合.

上述示例的所需输出将是:

ID,SumOfVectors
1,[5,2,7]
2,[10,8,4]
3,[7,15,9]
...
Run Code Online (Sandbox Code Playgroud)

可用的聚合函数将不起作用,例如,df.groupBy($"ID").agg(sum($"Vec")将导致ClassCastException.

如何实现自定义聚合函数,允许我进行向量或数组的总和或任何其他自定义操作?

scala aggregate-functions apache-spark apache-spark-sql apache-spark-ml

22
推荐指数
1
解决办法
1万
查看次数

Spark在groupBy/aggregate中合并/组合数组

以下Spark代码正确演示了我想要做的事情,并使用一个小的演示数据集生成正确的输出.

当我在大量生产数据上运行相同的一般类型的代码时,我遇到了运行时问题.Spark作业在我的集群上运行约12个小时并失败.

只是看一下下面的代码,爆炸每一行似乎都是低效的,只是将它合并回来.在给定的测试数据集中,第四行包含array_value_1中的三个值和array_value_2中的三个值,它们将爆炸为3*3或9个爆炸行.

那么,在一个更大的数据集中,一行有五个这样的数组列,每列有十个值,会爆炸成10 ^ 5个爆炸行?

查看提供的Spark函数,没有开箱即用的功能可以实现我想要的功能.我可以提供用户定义的功能.这有什么速度缺点吗?

val sparkSession = SparkSession.builder.
  master("local")
  .appName("merge list test")
  .getOrCreate()

val schema = StructType(
  StructField("category", IntegerType) ::
    StructField("array_value_1", ArrayType(StringType)) ::
    StructField("array_value_2", ArrayType(StringType)) ::
    Nil)

val rows = List(
  Row(1, List("a", "b"), List("u", "v")),
  Row(1, List("b", "c"), List("v", "w")),
  Row(2, List("c", "d"), List("w")),
  Row(2, List("c", "d", "e"), List("x", "y", "z"))
)

val df = sparkSession.createDataFrame(rows.asJava, schema)

val dfExploded = df.
  withColumn("scalar_1", explode(col("array_value_1"))).
  withColumn("scalar_2", explode(col("array_value_2")))

// This will output 19. 2*2 + 2*2 + 2*1 + …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

9
推荐指数
1
解决办法
1万
查看次数

如何在Spark中对GroupedData进行自定义操作?

我想重写一些用RDD编写的代码来使用DataFrames.在我找到这个之前,它工作得非常顺利:

 events
  .keyBy(row => (row.getServiceId + row.getClientCreateTimestamp + row.getClientId, row) )
  .reduceByKey((e1, e2) => if(e1.getClientSendTimestamp <= e2.getClientSendTimestamp) e1 else e2)
  .values
Run Code Online (Sandbox Code Playgroud)

它很简单

 events
  .groupBy(events("service_id"), events("client_create_timestamp"), events("client_id"))
Run Code Online (Sandbox Code Playgroud)

但下一步是什么?如果我想迭代当前组中的每个元素怎么办?它甚至可能吗?提前致谢.

grouping scala apache-spark

6
推荐指数
1
解决办法
3267
查看次数

将自定义函数应用于spark数据帧组

我有一个非常大的时间序列数据表,其中包含以下列:

  • 时间戳
  • LicensePlate
  • UberRide#
  • 速度

应考虑整组数据处理每个LicensePlate/UberRide数据集合.换句话说,我不需要逐行处理数据,而是将所有行按(LicensePlate/UberRide)一起分组.

我计划在数据帧api中使用spark,但我对如何在spark分组数据帧上执行自定义计算感到困惑.

我需要做的是:

  1. 获取所有数据
  2. 按一些列分组
  3. Foreach spark数据帧组应用af(x).返回自定义对象foreach组
  4. 通过应用g(x)并返回单个自定义对象来获取结果

我该怎么做第3步和第4步?我应该使用哪些关于spark API(数据帧,数据集,rdd,也许是pandas ......)的提示?

整个工作流程如下:

工作流程

group-by dataset dataframe apache-spark pyspark

6
推荐指数
2
解决办法
1万
查看次数

PySpark数据帧上的自定义聚合

我有一个PySpark DataFrame,其中一列作为一个热编码向量.我想在groupby之后通过向量加法来聚合不同的一个热编码向量

例如 df[userid,action] Row1: ["1234","[1,0,0]] Row2: ["1234", [0 1 0]]

我希望输出为行:["1234", [ 1 1 0]]因此向量是所有向量分组的总和userid.

我怎样才能做到这一点?PySpark sum聚合操作不支持向量加法.

aggregate-functions user-defined-functions apache-spark apache-spark-sql pyspark

6
推荐指数
1
解决办法
6030
查看次数

在 PySpark 中计算加权平均值

我正在尝试计算 pyspark 中的加权平均值,但没有取得很大进展

# Example data
df = sc.parallelize([
    ("a", 7, 1), ("a", 5, 2), ("a", 4, 3),
    ("b", 2, 2), ("b", 5, 4), ("c", 1, -1)
]).toDF(["k", "v1", "v2"])
df.show()

import numpy as np
def weighted_mean(workclass, final_weight):
    return np.average(workclass, weights=final_weight)

weighted_mean_udaf = pyspark.sql.functions.udf(weighted_mean,
    pyspark.sql.types.IntegerType())
Run Code Online (Sandbox Code Playgroud)

但是当我尝试执行这段代码时

df.groupby('k').agg(weighted_mean_udaf(df.v1,df.v2)).show()
Run Code Online (Sandbox Code Playgroud)

我收到错误

u"expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

5
推荐指数
1
解决办法
3822
查看次数

如何在Spark SQL中找到分组Vector列的平均值?

RelationalGroupedDataset通过调用创建了一个instances.groupBy(instances.col("property_name")):

val x = instances.groupBy(instances.col("property_name"))
Run Code Online (Sandbox Code Playgroud)

如何组合用户定义的聚合函数来对每个组执行Statistics.colStats().mean

谢谢!

aggregate-functions user-defined-functions apache-spark apache-spark-sql apache-spark-ml

5
推荐指数
1
解决办法
2327
查看次数

Spark 中的累积乘积?

我尝试在 Spark scala 中实现一个累积产品,但我真的不知道如何实现。我有以下数据框:

Input data:
+--+--+--------+----+
|A |B | date   | val|
+--+--+--------+----+
|rr|gg|20171103| 2  |
|hh|jj|20171103| 3  |
|rr|gg|20171104| 4  |
|hh|jj|20171104| 5  |
|rr|gg|20171105| 6  |
|hh|jj|20171105| 7  |
+-------+------+----+
Run Code Online (Sandbox Code Playgroud)

我想要以下输出

Output data:
+--+--+--------+-----+
|A |B | date   | val |
+--+--+--------+-----+
|rr|gg|20171105| 48  | // 2 * 4 * 6
|hh|jj|20171105| 105 | // 3 * 5 * 7
+-------+------+-----+
Run Code Online (Sandbox Code Playgroud)

如果您对如何做有任何想法,那将非常有帮助:)

非常感谢

scala aggregation apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1551
查看次数