小编Ram*_*jan的帖子

Json使用scala.util.parsing.json在Scala中解析

我有一个json对象"{"id":1,"name":"OZKA","birthDate":"1981-02-08T20:00:00.000Z","monthRevenue":1000.75,"developer":true}"和代码:

println(request.getParameter("content"))//{"id":1,"name":"OZKA","birthDate":"1981-02-08T20:00:00.000Z","monthRevenue":1000.75,"developer":true}
val result = scala.util.parsing.json.JSON.parseFull(request.getParameter("content"))
result match {
  case Some(e) => { println(e); //output: Map(name -> OZKA, monthRevenue -> 1000.75, developer -> true, birthDate -> 1981-02-08T20:00:00.000Z, id -> 1.0)
    e.foreach((key: Any, value: Any) => {println(key + ":" + value)})
  }
  case None => println("Failed.")
}
Run Code Online (Sandbox Code Playgroud)

,当我尝试调用map或foreach函数时,编译器抛出一个错误"值foreach不是Any的成员".任何人都可以建议我一个方法,我如何解析这个json字符串并将其转换为Scala类型

json scala

2
推荐指数
1
解决办法
1万
查看次数

为什么spark为空属性抛出ArrayIndexOutOfBoundsException预期?

上下文

我正在使用Spark 1.5.

我有一个文件records.txt,它是ctrl A分隔的,在该文件中,第31个索引是用于subscriber_id.对于某些记录,subscriber_id为空.subscriber_id记录不为空.

这里subscriber_id(UK8jikahasjp23)位于最后一个属性之前:

99^A2013-12-11^A23421421412^qweqweqw2222^A34232432432^A365633049^A1^A6yudgfdhaf9923^AAC^APrimary DTV^AKKKR DATA+ PVR3^AGrundig^AKKKR PVR3^AKKKR DATA+ PVR3^A127b146^APVR3^AYes^ANo^ANo^ANo^AYes^AYes^ANo^A2017-08-07 21:27:30.000000^AYes^ANo^ANo^A6yudgfdhaf9923^A7290921396551747605^A2013-12-11 16:00:03.000000^A7022497306379992936^AUK8jikahasjp23^A
Run Code Online (Sandbox Code Playgroud)

subscriber_id记录为空:

23^A2013-12-11^A23421421412^qweqweqw2222^A34232432432^A365633049^A1^A6yudgfdhaf9923^AAC^APrimary DTV^AKKKR DATA+ PVR3^AGrundig^AKKKR PVR3^AKKKR DATA+ PVR3^A127b146^APVR3^AYes^ANo^ANo^ANo^AYes^AYes^ANo^A2017-08-07 21:27:30.000000^AYes^ANo^ANo^A6yudgfdhaf9923^A7290921396551747605^A2013-12-11 16:00:03.000000^A7022497306379992936^A^A
Run Code Online (Sandbox Code Playgroud)

问题

我收到了带有空subscriber_id的记录的java.lang.ArrayIndexOutOfBoundsException.

为什么spark 为字段subscriber_id的空值抛出 java.lang.ArrayIndexOutOfBoundsException

16/08/20 10:22:18 WARN scheduler.TaskSetManager:阶段8.0中丢失的任务31.0:java.lang.ArrayIndexOutOfBoundsException:31

 case class CustomerCard(accountNumber:String, subscriber_id:String,subscriptionStatus:String )

     object CustomerCardProcess {
    val log = LoggerFactory.getLogger(this.getClass.getName)


   def doPerform(sc: SparkContext, sqlContext: HiveContext, custCardRDD: RDD[String]): DataFrame = {

    import sqlContext.implicits._
    log.info("doCustomerCardProcess method started")
     val splitRDD        =    custCardRDD.map(elem => elem.split("\\u0001"))
     val schemaRDD …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

1
推荐指数
1
解决办法
2965
查看次数

Pyspark - TypeError:使用reduceByKey计算平均值时“float”对象不可下标

我的“asdasd.csv”文件具有以下结构。

 Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand
Run Code Online (Sandbox Code Playgroud)

好的,我得到以下 {key,value} 元组来对其进行操作。

#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
#           part A (key)               part B (value) 
Run Code Online (Sandbox Code Playgroud)

我计算平均值的代码如下,我必须计算每个键的每列 X、YZ 的平均值。

rdd_ori = sc.textFile("asdasd.csv") \
        .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x,1)) \
            .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))\
            .mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
Run Code Online (Sandbox Code Playgroud)

我的问题是我尝试了该代码,它在其他 PC 上运行良好,并且与我用于开发它的相同 MV (PySpark Py3)

这是一个例子,这段代码是正确的:

在此输入图像描述

但我不知道为什么会收到此错误,重要的部分在Strong中。

-------------------------------------------------- ------------------------- Py4JJavaError Traceback (最近一次调用) in …

python apache-spark pyspark

1
推荐指数
1
解决办法
4280
查看次数

Apache Spark:从行中提取值的问题

我在 Spark 中的 Row 类遇到了很多问题。在我看来 Row 类是一个真正设计糟糕的类。从 Row 中提取一个值应该并不比从 Scala 列表中提取一个值更困难;但实际上,您必须知道列的确切类型才能提取它。你甚至不能把列变成字符串;对于像 Spark 这样的伟大框架来说,这有多荒谬?在现实世界中,在大多数情况下,您不知道列的确切类型,而且在许多情况下,最重要的是,您有数十个或数百个列。下面是一个示例,向您展示我得到的 ClassCastExceptions。

有没有人有任何解决方案可以轻松地从 Row 中提取值?

scala> val df = List((1,2),(3,4)).toDF("col1","col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: int]


scala> df.first.getAs[String]("col1")
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
  ... 56 elided

scala> df.first.getAs[Int]("col1")
res12: Int = 1

scala> df.first.getInt(0)
res13: Int = 1

scala> df.first.getLong(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
  at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
  at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
  at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:165)
  ... 56 elided

scala> df.first.getFloat(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Float …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

1
推荐指数
1
解决办法
2680
查看次数

选择DataFrame中数组的最后一个元素

我正在一个项目上,正在处理具有复杂模式/数据结构的一些嵌套JSON日期。基本上,我想做的是在数据框中过滤掉其中的一列,以便选择数组中的最后一个元素。我完全坚持如何做到这一点。我希望这是有道理的。

以下是我要完成的示例:

val singersDF = Seq(
  ("beatles", "help,hey,jude"),
  ("romeo", "eres,mia"),
  ("elvis", "this,is,an,example")
).toDF("name", "hit_songs")

val actualDF = singersDF.withColumn(
  "hit_songs",
  split(col("hit_songs"), "\\,")
)

actualDF.show(false)
actualDF.printSchema() 

+-------+-----------------------+
|name   |hit_songs              |
+-------+-----------------------+
|beatles|[help, hey, jude]      |
|romeo  |[eres, mia]            |
|elvis  |[this, is, an, example]|
+-------+-----------------------+
root
 |-- name: string (nullable = true)
 |-- hit_songs: array (nullable = true)
 |    |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)

输出的最终目标将是以下内容,以选择hit_songs数组中的最后一个“字符串”。

我不担心之后的架构是什么样的。

+-------+---------+
|name   |hit_songs|
+-------+---------+
|beatles|jude     |
|romeo  |mia      |
|elvis  |example  |
+-------+---------+
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

1
推荐指数
3
解决办法
2184
查看次数

替换Spark数据框中的列名称的特殊字符

我有我的输入spark-dataframe命名df为,

+---------------+----------------+-----------------------+
|Main_CustomerID|126+ Concentrate|2.5 Ethylhexyl_Acrylate|
+---------------+----------------+-----------------------+
|         725153|             3.0|                    2.0|
|         873008|             4.0|                    1.0|
|         625109|             1.0|                    0.0|
+---------------+----------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)

我需要从以df下列名称中删除特殊字符,

  • 去掉 +

  • 替换为 underscore

  • 替换dotunderscore

所以我df应该像

+---------------+---------------+-----------------------+
|Main_CustomerID|126_Concentrate|2_5_Ethylhexyl_Acrylate|
+---------------+---------------+-----------------------+
|         725153|            3.0|                    2.0|
|         873008|            4.0|                    1.0|
|         625109|            1.0|                    0.0|
+---------------+---------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)

使用Scala,我已经做到了,

var tableWithColumnsRenamed = df

for (field <- tableWithColumnsRenamed.columns) {
      tableWithColumnsRenamed = tableWithColumnsRenamed
        .withColumnRenamed(field, field.replaceAll("\\.", "_"))
    }
for (field <- tableWithColumnsRenamed.columns) {
      tableWithColumnsRenamed = tableWithColumnsRenamed
        .withColumnRenamed(field, field.replaceAll("\\+", …
Run Code Online (Sandbox Code Playgroud)

replace scala apache-spark-sql

1
推荐指数
2
解决办法
6086
查看次数

spark中不同的读取选项有什么区别?

我正在通过以下代码读取 csv 文件:-

    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
            .master("local[2]") \
            .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

现在有四种不同的阅读选项:

  1. df = spark.read.load("/..../xyz.csv")
  2. df = spark.read.csv("/..../xyz.csv")
  3. df = spark.read.format('csv').load("/..../xyz.csv")
  4. df = spark.read.option().csv("/..../xyz.csv")

我应该使用哪个选项?

编辑:-

此外,无论是inferSchema="true"inferSchema=True正在工作。我们可以盲目使用任何一种吗?

python csv apache-spark apache-spark-sql pyspark

1
推荐指数
2
解决办法
2172
查看次数

spark-计算2列或更多列中的平均值,并在每行中放入新列

假设我有一个包含以下内容的数据集/数据框:

name, marks1, marks2
Alice, 10, 20
Bob, 20, 30
Run Code Online (Sandbox Code Playgroud)

我想添加一个新列,该列应具有列B和C的平均值。

预期结果:-

name, marks1, marks2, Result(Avg)
Alice, 10, 20, 15
Bob, 20, 30, 25
Run Code Online (Sandbox Code Playgroud)

用于求和或任何其他算术运算df.withColumn("xyz", $"marks1"+$"marks2")。我找不到平均值的类似方法。请帮忙。

另外:-列数不是固定的。就像有时它可能是2列的平均值,有时是3列甚至更多列。所以我想要一个通用的代码,它应该可以工作。

apache-spark apache-spark-sql pyspark pyspark-sql

1
推荐指数
2
解决办法
5581
查看次数

将数据帧写入 HDFS 时出现 NumberFormatException 错误

我正在写信dataframeHDFS,使用以下代码

final_df.write.format("com.databricks.spark.csv").option("header", "true").save("path_to_hdfs")
Run Code Online (Sandbox Code Playgroud)

它给了我以下错误:

Caused by: java.lang.NumberFormatException: For input string: "124085346080"
Run Code Online (Sandbox Code Playgroud)

完整堆栈如下:

at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "124085346080"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:583)
    at java.lang.Integer.parseInt(Integer.java:615)
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:241)
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:116)
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:85)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:128)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:127)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) …
Run Code Online (Sandbox Code Playgroud)

hadoop scala hdfs apache-spark pyspark

0
推荐指数
1
解决办法
1640
查看次数

SCALA:读取文本文件并创建它的元组

如何从以下存在的RDD创建元组?

// reading a text file "b.txt" and creating RDD 
val rdd = sc.textFile("/home/training/desktop/b.txt") 
Run Code Online (Sandbox Code Playgroud)

b.txt数据集 - >

 Ankita,26,BigData,newbie
 Shikha,30,Management,Expert
Run Code Online (Sandbox Code Playgroud)

scala tuples apache-spark

0
推荐指数
1
解决办法
1662
查看次数