我只是想知道Apache Spark中的RDD和DataFrame (Spark 2.0.0 DataFrame只是一个类型别名Dataset[Row])有什么区别?
你能把一个转换成另一个吗?
我有一些火花代码来处理csv文件.它对它做了一些改造.我现在想将此RDD保存为csv文件并添加标头.此RDD的每一行都已正确格式化.
我不知道该怎么做.我想用标题字符串和我的RDD进行联合但是标题字符串不是RDD所以它不起作用.
我一直在玩转换RDD到DataFrames然后再回来.首先,我有一个名为dataPair的类型(Int,Int)的RDD.然后我创建了一个带有列标题的DataFrame对象:
val dataFrame = dataPair.toDF(header(0), header(1))
Run Code Online (Sandbox Code Playgroud)
然后我使用以下命令将其从DataFrame转换回RDD:
val testRDD = dataFrame.rdd
Run Code Online (Sandbox Code Playgroud)
返回类型为org.apache.spark.sql.Row的RDD(不是(Int,Int)).然后我想用.toDF将它转换回RDD,但是我收到一个错误:
error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
Run Code Online (Sandbox Code Playgroud)
我已经尝试为testRDD定义类型Data(Int,Int)的Schema,但是我得到了类型不匹配的异常:
error: type mismatch;
found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[Data]
val testRDD: RDD[Data] = dataFrame.rdd
^
Run Code Online (Sandbox Code Playgroud)
我已经进口了
import sqlContext.implicits._
Run Code Online (Sandbox Code Playgroud) 我正在Scala中实现Spark Streaming,我从Kafka主题中提取JSON字符串,并希望将它们加载到数据帧中.有没有办法做到这一点,Spark从RDD [String]中推断出自己的架构?
我试图将Spark RDD转换为Spark SQL数据帧toDF().我已成功多次使用此函数,但在这种情况下,我收到编译器错误:
error: value toDF is not a member of org.apache.spark.rdd.RDD[com.example.protobuf.SensorData]
Run Code Online (Sandbox Code Playgroud)
这是我的代码如下:
// SensorData is an auto-generated class
import com.example.protobuf.SensorData
def loadSensorDataToRdd : RDD[SensorData] = ???
object MyApplication {
def main(argv: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("My application")
conf.set("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val sensorDataRdd = loadSensorDataToRdd()
val sensorDataDf = sensorDataRdd.toDF() // <-- CAUSES COMPILER ERROR
}
}
Run Code Online (Sandbox Code Playgroud)
我猜测问题出在SensorData类上,这是一个从协议缓冲区自动生成的Java类.为了将RDD转换为数据帧,我该怎么办?
我试图从RDD创建一个Dataframe [cassandraRow] ..但我不能因为createDataframe(RDD [Row],schema:StructType)需要RDD [Row]而不是RDD [cassandraRow].
并且根据这个问题的答案 如何将rdd对象转换为spark中的dataframe
(其中一个答案)建议在RDD [Row]上使用toDF()从RDD获取Dataframe,这对我不起作用.我尝试在另一个例子中使用RDD [Row](尝试使用toDF()).
以下是有趣的:
val rddSTG = sc.parallelize(
List ( ("RTD","ANT","SOYA BEANS", "20161123", "20161123", 4000, "docid11", null, 5) ,
("RTD","ANT","SOYA BEANS", "20161124", "20161123", 6000, "docid11", null, 4) ,
("RTD","ANT","BANANAS", "20161124", "20161123", 7000, "docid11", null, 9) ,
("HAM","ANT","CORN", "20161123", "20161123", 1000, "docid22", null, 33),
("LIS","PAR","BARLEY", "20161123", "20161123", 11111, "docid33", null, 44)
)
)
val dataframe = rddSTG.toDF("ORIG", "DEST", "PROD", "PLDEPDATE", "PLARRDATE", "PLCOST", "docid", "ACTARRDATE", "mutationseq")
dataframe.createOrReplaceTempView("STG")
spark.sql("SELECT * FROM STG ORDER BY PLDEPDATE DESC").show()
Run Code Online (Sandbox Code Playgroud)
它产生如下错误:
scala.MatchError: Null (of class scala.reflect.internal.Types$TypeRef$$anon$6)
Run Code Online (Sandbox Code Playgroud)
一旦我将其中一个null值更改为non-null,它的工作就会生效。我想我明白了,因为无法在现场进行推断,但这似乎很奇怪。有想法吗?
我有一个名为RowRDD的行RDD.我只是想转换成DataFrame.从我在互联网上看到的各个地方的例子,我看到我正在尝试RowRDD.toDF()我得到错误:
value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]