相关疑难解决方法(0)

将DataFrame保存为CSV时指定文件名

假设我有一个Spark DF,我想将其保存到磁盘CSV文件.在Spark 2.0.0+中,可以转换DataFrame(DataSet[Rows])为a DataFrameWriter并使用该.csv方法来编写文件.

该功能定义为

def csv(path: String): Unit
    path : the location/folder name and not the file name.
Run Code Online (Sandbox Code Playgroud)

Spark将csv文件存储在指定位置,方法是创建名称为part - *.csv的CSV文件.

有没有办法用指定的文件名而不是部分保存CSV - *.csv?或者可以指定前缀而不是part-r?

代码:

df.coalesce(1).write.csv("sample_path")
Run Code Online (Sandbox Code Playgroud)

电流输出:

sample_path
|
+-- part-r-00000.csv
Run Code Online (Sandbox Code Playgroud)

期望的输出:

sample_path
|
+-- my_file.csv
Run Code Online (Sandbox Code Playgroud)

注意: coalesce函数用于输出单个文件,执行程序有足够的内存来收集DF而没有内存错误.

csv scala apache-spark pyspark

24
推荐指数
1
解决办法
3万
查看次数

如何使用结构化流媒体从Kafka读取JSON格式的记录?

我正在尝试使用基于DataFrame/Dataset API的Spark-Streaming来加载来自Kafka的数据流的结构化流方法.

我用:

  • 火花2.10
  • 卡夫卡0.10
  • 火花-SQL卡夫卡-0-10

Spark Kafka DataSource定义了底层架构:

|key|value|topic|partition|offset|timestamp|timestampType|
Run Code Online (Sandbox Code Playgroud)

我的数据采用json格式,并存储在列中.我正在寻找一种方法如何从值列中提取底层模式并将接收到的数据帧更新为存储在值中的列?我尝试了下面的方法,但它不起作用:

 val columns = Array("column1", "column2") // column names
 val rawKafkaDF = sparkSession.sqlContext.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe",topic)
  .load()
  val columnsToSelect = columns.map( x => new Column("value." + x))
  val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)

  // some analytics using stream dataframe kafkaDF

  val query = kafkaDF.writeStream.format("console").start()
  query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

在这里我得到了Exception,org.apache.spark.sql.AnalysisException: Can't extract value from value#337;因为在创建流时,里面的值是未知的...

你有什么建议吗?

scala apache-kafka apache-spark apache-spark-sql spark-structured-streaming

12
推荐指数
1
解决办法
5339
查看次数