假设我有一个Spark DF,我想将其保存到磁盘CSV文件.在Spark 2.0.0+中,可以转换DataFrame(DataSet[Rows])为a DataFrameWriter并使用该.csv方法来编写文件.
该功能定义为
def csv(path: String): Unit
path : the location/folder name and not the file name.
Run Code Online (Sandbox Code Playgroud)
Spark将csv文件存储在指定位置,方法是创建名称为part - *.csv的CSV文件.
有没有办法用指定的文件名而不是部分保存CSV - *.csv?或者可以指定前缀而不是part-r?
代码:
df.coalesce(1).write.csv("sample_path")
Run Code Online (Sandbox Code Playgroud)
电流输出:
sample_path
|
+-- part-r-00000.csv
Run Code Online (Sandbox Code Playgroud)
期望的输出:
sample_path
|
+-- my_file.csv
Run Code Online (Sandbox Code Playgroud)
注意: coalesce函数用于输出单个文件,执行程序有足够的内存来收集DF而没有内存错误.
我正在尝试使用基于DataFrame/Dataset API的Spark-Streaming来加载来自Kafka的数据流的结构化流方法.
我用:
Spark Kafka DataSource定义了底层架构:
|key|value|topic|partition|offset|timestamp|timestampType|
Run Code Online (Sandbox Code Playgroud)
我的数据采用json格式,并存储在值列中.我正在寻找一种方法如何从值列中提取底层模式并将接收到的数据帧更新为存储在值中的列?我尝试了下面的方法,但它不起作用:
val columns = Array("column1", "column2") // column names
val rawKafkaDF = sparkSession.sqlContext.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe",topic)
.load()
val columnsToSelect = columns.map( x => new Column("value." + x))
val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)
// some analytics using stream dataframe kafkaDF
val query = kafkaDF.writeStream.format("console").start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
在这里我得到了Exception,org.apache.spark.sql.AnalysisException: Can't extract value from value#337;因为在创建流时,里面的值是未知的...
你有什么建议吗?
scala apache-kafka apache-spark apache-spark-sql spark-structured-streaming