我有一个json对象"{"id":1,"name":"OZKA","birthDate":"1981-02-08T20:00:00.000Z","monthRevenue":1000.75,"developer":true}"和代码:
println(request.getParameter("content"))//{"id":1,"name":"OZKA","birthDate":"1981-02-08T20:00:00.000Z","monthRevenue":1000.75,"developer":true}
val result = scala.util.parsing.json.JSON.parseFull(request.getParameter("content"))
result match {
case Some(e) => { println(e); //output: Map(name -> OZKA, monthRevenue -> 1000.75, developer -> true, birthDate -> 1981-02-08T20:00:00.000Z, id -> 1.0)
e.foreach((key: Any, value: Any) => {println(key + ":" + value)})
}
case None => println("Failed.")
}
Run Code Online (Sandbox Code Playgroud)
,当我尝试调用map或foreach函数时,编译器抛出一个错误"值foreach不是Any的成员".任何人都可以建议我一个方法,我如何解析这个json字符串并将其转换为Scala类型
上下文
我正在使用Spark 1.5.
我有一个文件records.txt,它是ctrl A分隔的,在该文件中,第31个索引是用于subscriber_id.对于某些记录,subscriber_id为空.subscriber_id记录不为空.
这里subscriber_id(UK8jikahasjp23)位于最后一个属性之前:
99^A2013-12-11^A23421421412^qweqweqw2222^A34232432432^A365633049^A1^A6yudgfdhaf9923^AAC^APrimary DTV^AKKKR DATA+ PVR3^AGrundig^AKKKR PVR3^AKKKR DATA+ PVR3^A127b146^APVR3^AYes^ANo^ANo^ANo^AYes^AYes^ANo^A2017-08-07 21:27:30.000000^AYes^ANo^ANo^A6yudgfdhaf9923^A7290921396551747605^A2013-12-11 16:00:03.000000^A7022497306379992936^AUK8jikahasjp23^A
Run Code Online (Sandbox Code Playgroud)
subscriber_id记录为空:
23^A2013-12-11^A23421421412^qweqweqw2222^A34232432432^A365633049^A1^A6yudgfdhaf9923^AAC^APrimary DTV^AKKKR DATA+ PVR3^AGrundig^AKKKR PVR3^AKKKR DATA+ PVR3^A127b146^APVR3^AYes^ANo^ANo^ANo^AYes^AYes^ANo^A2017-08-07 21:27:30.000000^AYes^ANo^ANo^A6yudgfdhaf9923^A7290921396551747605^A2013-12-11 16:00:03.000000^A7022497306379992936^A^A
Run Code Online (Sandbox Code Playgroud)
问题
我收到了带有空subscriber_id的记录的java.lang.ArrayIndexOutOfBoundsException.
为什么spark 为字段subscriber_id的空值抛出 java.lang.ArrayIndexOutOfBoundsException?
16/08/20 10:22:18 WARN scheduler.TaskSetManager:阶段8.0中丢失的任务31.0:java.lang.ArrayIndexOutOfBoundsException:31
case class CustomerCard(accountNumber:String, subscriber_id:String,subscriptionStatus:String )
object CustomerCardProcess {
val log = LoggerFactory.getLogger(this.getClass.getName)
def doPerform(sc: SparkContext, sqlContext: HiveContext, custCardRDD: RDD[String]): DataFrame = {
import sqlContext.implicits._
log.info("doCustomerCardProcess method started")
val splitRDD = custCardRDD.map(elem => elem.split("\\u0001"))
val schemaRDD …Run Code Online (Sandbox Code Playgroud) 我的“asdasd.csv”文件具有以下结构。
Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand
Run Code Online (Sandbox Code Playgroud)
好的,我得到以下 {key,value} 元组来对其进行操作。
# x y z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
# part A (key) part B (value)
Run Code Online (Sandbox Code Playgroud)
我计算平均值的代码如下,我必须计算每个键的每列 X、YZ 的平均值。
rdd_ori = sc.textFile("asdasd.csv") \
.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))
meanRDD = rdd_ori.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))\
.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
Run Code Online (Sandbox Code Playgroud)
我的问题是我尝试了该代码,它在其他 PC 上运行良好,并且与我用于开发它的相同 MV (PySpark Py3)
这是一个例子,这段代码是正确的:
但我不知道为什么会收到此错误,重要的部分在Strong中。
-------------------------------------------------- ------------------------- Py4JJavaError Traceback (最近一次调用) in …
我在 Spark 中的 Row 类遇到了很多问题。在我看来 Row 类是一个真正设计糟糕的类。从 Row 中提取一个值应该并不比从 Scala 列表中提取一个值更困难;但实际上,您必须知道列的确切类型才能提取它。你甚至不能把列变成字符串;对于像 Spark 这样的伟大框架来说,这有多荒谬?在现实世界中,在大多数情况下,您不知道列的确切类型,而且在许多情况下,最重要的是,您有数十个或数百个列。下面是一个示例,向您展示我得到的 ClassCastExceptions。
有没有人有任何解决方案可以轻松地从 Row 中提取值?
scala> val df = List((1,2),(3,4)).toDF("col1","col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: int]
scala> df.first.getAs[String]("col1")
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
... 56 elided
scala> df.first.getAs[Int]("col1")
res12: Int = 1
scala> df.first.getInt(0)
res13: Int = 1
scala> df.first.getLong(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:165)
... 56 elided
scala> df.first.getFloat(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Float …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
我正在一个项目上,正在处理具有复杂模式/数据结构的一些嵌套JSON日期。基本上,我想做的是在数据框中过滤掉其中的一列,以便选择数组中的最后一个元素。我完全坚持如何做到这一点。我希望这是有道理的。
以下是我要完成的示例:
val singersDF = Seq(
("beatles", "help,hey,jude"),
("romeo", "eres,mia"),
("elvis", "this,is,an,example")
).toDF("name", "hit_songs")
val actualDF = singersDF.withColumn(
"hit_songs",
split(col("hit_songs"), "\\,")
)
actualDF.show(false)
actualDF.printSchema()
+-------+-----------------------+
|name |hit_songs |
+-------+-----------------------+
|beatles|[help, hey, jude] |
|romeo |[eres, mia] |
|elvis |[this, is, an, example]|
+-------+-----------------------+
root
|-- name: string (nullable = true)
|-- hit_songs: array (nullable = true)
| |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)
输出的最终目标将是以下内容,以选择hit_songs数组中的最后一个“字符串”。
我不担心之后的架构是什么样的。
+-------+---------+
|name |hit_songs|
+-------+---------+
|beatles|jude |
|romeo |mia |
|elvis |example |
+-------+---------+
Run Code Online (Sandbox Code Playgroud) 我有我的输入spark-dataframe命名df为,
+---------------+----------------+-----------------------+
|Main_CustomerID|126+ Concentrate|2.5 Ethylhexyl_Acrylate|
+---------------+----------------+-----------------------+
| 725153| 3.0| 2.0|
| 873008| 4.0| 1.0|
| 625109| 1.0| 0.0|
+---------------+----------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
我需要从以df下列名称中删除特殊字符,
去掉 +
替换为 underscore
dot为underscore所以我df应该像
+---------------+---------------+-----------------------+
|Main_CustomerID|126_Concentrate|2_5_Ethylhexyl_Acrylate|
+---------------+---------------+-----------------------+
| 725153| 3.0| 2.0|
| 873008| 4.0| 1.0|
| 625109| 1.0| 0.0|
+---------------+---------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
使用Scala,我已经做到了,
var tableWithColumnsRenamed = df
for (field <- tableWithColumnsRenamed.columns) {
tableWithColumnsRenamed = tableWithColumnsRenamed
.withColumnRenamed(field, field.replaceAll("\\.", "_"))
}
for (field <- tableWithColumnsRenamed.columns) {
tableWithColumnsRenamed = tableWithColumnsRenamed
.withColumnRenamed(field, field.replaceAll("\\+", …Run Code Online (Sandbox Code Playgroud) 我正在通过以下代码读取 csv 文件:-
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[2]") \
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
现在有四种不同的阅读选项:
df = spark.read.load("/..../xyz.csv")df = spark.read.csv("/..../xyz.csv")df = spark.read.format('csv').load("/..../xyz.csv")df = spark.read.option().csv("/..../xyz.csv")我应该使用哪个选项?
编辑:-
此外,无论是inferSchema="true"和inferSchema=True正在工作。我们可以盲目使用任何一种吗?
假设我有一个包含以下内容的数据集/数据框:
name, marks1, marks2
Alice, 10, 20
Bob, 20, 30
Run Code Online (Sandbox Code Playgroud)
我想添加一个新列,该列应具有列B和C的平均值。
预期结果:-
name, marks1, marks2, Result(Avg)
Alice, 10, 20, 15
Bob, 20, 30, 25
Run Code Online (Sandbox Code Playgroud)
用于求和或任何其他算术运算df.withColumn("xyz", $"marks1"+$"marks2")。我找不到平均值的类似方法。请帮忙。
另外:-列数不是固定的。就像有时它可能是2列的平均值,有时是3列甚至更多列。所以我想要一个通用的代码,它应该可以工作。
我正在写信dataframe给HDFS,使用以下代码
final_df.write.format("com.databricks.spark.csv").option("header", "true").save("path_to_hdfs")
Run Code Online (Sandbox Code Playgroud)
它给了我以下错误:
Caused by: java.lang.NumberFormatException: For input string: "124085346080"
Run Code Online (Sandbox Code Playgroud)
完整堆栈如下:
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "124085346080"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:583)
at java.lang.Integer.parseInt(Integer.java:615)
at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:241)
at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:116)
at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:85)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:128)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:127)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) …Run Code Online (Sandbox Code Playgroud) 如何从以下存在的RDD创建元组?
// reading a text file "b.txt" and creating RDD
val rdd = sc.textFile("/home/training/desktop/b.txt")
Run Code Online (Sandbox Code Playgroud)
b.txt数据集 - >
Ankita,26,BigData,newbie
Shikha,30,Management,Expert
Run Code Online (Sandbox Code Playgroud)