小编Tza*_*har的帖子

Scala-Spark使用参数值动态调用groupby和agg

我想编写自定义分组和聚合函数来获取用户指定的列名和用户指定的聚合映射.我不知道前面的列名和聚合映射.我想写一个类似下面的函数.但我是Scala的新手,我无法解决它.

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
  val grouped = df.groupBy(cols)
  val aggregated = grouped.agg(aggregateFun)
  aggregated.show()
}
Run Code Online (Sandbox Code Playgroud)

并希望称之为

val listOfStrings =  List("A", "B", "C")
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings)
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?任何人都可以帮助我.

customization scala group-by aggregate apache-spark

4
推荐指数
1
解决办法
3851
查看次数

如何从spark中的嵌套结构类型中提取列名和数据类型

如何从spark中的嵌套结构类型中提取列名和数据类型

模式变得像这样:

(events,StructType(
   StructField(beaconType,StringType,true),     
   StructField(beaconVersion,StringType,true), 
   StructField(client,StringType,true), 
   StructField(data,StructType(
      StructField(ad,StructType(
         StructField(adId,StringType,true)
      )
   )
)
Run Code Online (Sandbox Code Playgroud)

我想转换成下面的格式

Array[(String, String)] = Array(
  (client,StringType), 
  (beaconType,StringType), 
  (beaconVersion,StringType), 
  (phase,StringType)
Run Code Online (Sandbox Code Playgroud)

你能帮忙吗

scala apache-spark

4
推荐指数
1
解决办法
2万
查看次数

如何从scala中的RDD中获取最早的时间戳日期

我有一个RDD就好((String, String), TimeStamp).我有大量的记录,我想为每个键选择具有最新TimeStamp值的记录.我已经尝试了以下代码,仍然在努力解决这个问题.任何人都可以帮我这样做吗?

我尝试下面的代码是错误的,不能正常工作

val context = sparkSession.read.format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", url)
  .option("dbtable", "student_risk")
  .option("user", "user")
  .option("password", "password")
  .load()
context.cache();

val studentRDD = context.rdd.map(r => ((r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id"))), r.getTimestamp(r.fieldIndex("risk_date_time"))))
val filteredRDD = studentRDD.collect().map(z => (z._1, z._2)).reduce((x, y) => (x._2.compareTo(y._2)))
Run Code Online (Sandbox Code Playgroud)

scala mapreduce apache-spark

3
推荐指数
1
解决办法
1341
查看次数

如何在scala代码中使用系统IP地址?

我想使用 scala 在变量中设置 ipaddress。我试过下面的场景。我没有得到我想要的东西。

val sysip = System.InetAddress.getLocalHost();
Run Code Online (Sandbox Code Playgroud)

你能帮忙吗?

scala

3
推荐指数
1
解决办法
3991
查看次数

在spark-sql中执行SQL语句

我有一个格式如下的文本文件:

ID,Name,Rating
1,A,3
2,B,4
1,A,4
Run Code Online (Sandbox Code Playgroud)

我想找到 Spark 中每个 ID 的平均评分。这是我到目前为止的代码,但它不断给我一个错误:

val Avg_data=spark.sql("select ID, AVG(Rating) from table")
Run Code Online (Sandbox Code Playgroud)

错误:org.apache.sapk.sql.AnalysisException:分组表达式序列为空,并且 'table'.'ID' 不是聚合函数。将 '(avg(CAST(table.'Rating' AS BIGINT)) 包装为 'avg(Rating)')' 在窗口函数中.........

scala apache-spark apache-spark-sql

3
推荐指数
1
解决办法
2万
查看次数

如何检查文件是否存在于 Groovy 脚本中

Workbook aWorkBook = Workbook.getWorkbook(new File("C:\\Users\\Response.xls"));     
WritableWorkbook workbook1 = Workbook.createWorkbook(new File("C:\\Users\\Responses.xls"), aWorkBook);
Run Code Online (Sandbox Code Playgroud)

我正在 SOAP UI 中使用 Groovy 脚本进行数据驱动测试。以上是创建新文件然后将 PASS 或 FAIL 结果写入该 excel 的代码部分。假设如果有 5 个测试用例,那么我只想在第一个循环(第一个测试用例)中创建一次新文件,然后在下一个循环中它应该打开现有文件。但目前它正在每个循环中创建新文件并覆盖数据和 PASS 结果仅显示最后一个测试用例。任何人都可以帮助解决这个问题吗?

groovy data-driven soapui data-driven-tests

3
推荐指数
1
解决办法
2万
查看次数

如何正确打印rdd

对不起,我是一个新的学习者,现在我想以正确的格式打印一个rdd,但结果是这样的:

(200412169,([Ljava.lang.String;@7515eb2d,[Ljava.lang.String;@72031368))
(200412169,([Ljava.lang.String;@7515eb2d,[Ljava.lang.String;@27ef4b52))
Run Code Online (Sandbox Code Playgroud)

我的rdd是

Array[(String, (Array[String], Array[String]))] =
  Array(
    (200412169,(Array(gavin),Array(1, 24, 60, 85, 78))), 
    (200412169,(Array(gavin),Array(2, 22, 20, 85, 78))), 
    (200412166,(Array(gavin3),Array(1, 54, 80, 78, 98))), 
  )
Run Code Online (Sandbox Code Playgroud)

我想打印出来:

200412169       gavin   2       22      20      85      78
200412169       gavin   1       24      60      85      78
Run Code Online (Sandbox Code Playgroud)

有人可以帮助我,非常感谢.

scala apache-spark

2
推荐指数
1
解决办法
3513
查看次数

Flatmap scala [String,String,List [String]]

我有这个prbolem,我有一个RDD[(String,String, List[String]),我想"flatmap"它获得一个RDD[(String,String, String)]:

例如:

val x :RDD[(String,String,  List[String]) = 
RDD[(a,b, list[ "ra", "re", "ri"])]
Run Code Online (Sandbox Code Playgroud)

我想得到:

val result: RDD[(String,String,String)] = 
RDD[(a, b, ra),(a, b, re),(a, b, ri)])]
Run Code Online (Sandbox Code Playgroud)

scala extract flatmap apache-spark rdd

2
推荐指数
1
解决办法
593
查看次数

值连接不是org.apache.spark.rdd.RDD [(Long,T)]的成员

这个函数似乎对我的IDE有效:

def zip[T, U](rdd1:RDD[T], rdd2:RDD[U]) : RDD[(T,U)] = {
    rdd1
      .zipWithIndex
      .map(_.swap)
      .join(
        rdd2
          .zipWithIndex
          .map(_.swap))
      .values
}
Run Code Online (Sandbox Code Playgroud)

但是当我编译时,我得到:

value join不是org.apache.spark.rdd.RDD [(Long,T)]的成员可能的原因:在`value join'之前可能缺少分号?.加入(

我在Spark 1.6中,我已经尝试导入org.apache.spark.rdd.RDD._ 并且函数内部的代码在函数定义之外的两个RDD上直接使用时效果很好.

任何的想法 ?

scala apache-spark

1
推荐指数
1
解决办法
976
查看次数

Scala迭代器令人困惑

单击此处查看问题声明图像

我非常努力地理解为什么迭代器的行为如此.我的意思是表演一次

result = lines.filter(_.nonEmpty).map(_.toInt)
Run Code Online (Sandbox Code Playgroud)

迭代器缓冲区覆盖了除最后一个元素之外的所有elemnets.

我的意思是如果在给出5次之后我的输入文本文件中有5个元素

result = lines.filter(_.nonEmpty).map(_.toInt)
Run Code Online (Sandbox Code Playgroud)

我的迭代器变空了.

非常感谢任何帮助....在此先感谢

iteration iterator scala

1
推荐指数
1
解决办法
112
查看次数