使用 Spark Scala 计算平均值

akr*_*ckz 3 scala join apache-spark

如何使用以下两个数据集计算 Spark Scala 中每个位置的平均工资?

File1.csv(第4列是工资)

Ram, 30, Engineer, 40000  
Bala, 27, Doctor, 30000  
Hari, 33, Engineer, 50000  
Siva, 35, Doctor, 60000
Run Code Online (Sandbox Code Playgroud)

File2.csv(第2列是位置)

Hari, Bangalore  
Ram, Chennai  
Bala, Bangalore  
Siva, Chennai  
Run Code Online (Sandbox Code Playgroud)

以上文件未排序。需要加入这两个文件并找到每个地点的平均工资。我尝试使用下面的代码但无法成功。

val salary = sc.textFile("File1.csv").map(e => e.split(","))  
val location = sc.textFile("File2.csv").map(e.split(","))  
val joined = salary.map(e=>(e(0),e(3))).join(location.map(e=>(e(0),e(1)))  
val joinedData = joined.sortByKey()  
val finalData = joinedData.map(v => (v._1,v._2._1._1,v._2._2))  
val aggregatedDF = finalData.map(e=> e.groupby(e(2)).agg(avg(e(1))))    
aggregatedDF.repartition(1).saveAsTextFile("output.txt")  
Run Code Online (Sandbox Code Playgroud)

请帮助编写代码和示例输出,看看它的外观。

非常感谢

Leo*_*o C 5

您可以将 CSV 文件作为 DataFrame 读取,然后将它们连接并分组以获得平均值:

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
  "name", "age", "title", "salary"
)

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
  "name", "location"
)

import org.apache.spark.sql.functions._

val dfAverage = df1.join(df2, Seq("name")).
  groupBy(df2("location")).agg(avg(df1("salary")).as("average")).
  select("location", "average")

dfAverage.show
+-----------+-------+
|   location|average|
+-----------+-------+
|Bangalore  |40000.0|
|  Chennai  |50000.0|
+-----------+-------+
Run Code Online (Sandbox Code Playgroud)

[更新] 计算平均尺寸:

// file1.csv:
Ram,30,Engineer,40000,600*200
Bala,27,Doctor,30000,800*400
Hari,33,Engineer,50000,700*300
Siva,35,Doctor,60000,600*200

// file2.csv
Hari,Bangalore
Ram,Chennai
Bala,Bangalore
Siva,Chennai

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
  "name", "age", "title", "salary", "dimensions"
)

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
  "name", "location"
)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

val dfAverage = df1.join(df2, Seq("name")).
  groupBy(df2("location")).
  agg(
    avg(split(df1("dimensions"), ("\\*")).getItem(0).cast(IntegerType)).as("avg_length"),
    avg(split(df1("dimensions"), ("\\*")).getItem(1).cast(IntegerType)).as("avg_width")
  ).
  select(
    $"location", $"avg_length", $"avg_width",
    concat($"avg_length", lit("*"), $"avg_width").as("avg_dimensions")
  )

dfAverage.show
+---------+----------+---------+--------------+
| location|avg_length|avg_width|avg_dimensions|
+---------+----------+---------+--------------+
|Bangalore|     750.0|    350.0|   750.0*350.0|
|  Chennai|     600.0|    200.0|   600.0*200.0|
+---------+----------+---------+--------------+
Run Code Online (Sandbox Code Playgroud)