Spark:将数据帧写入s3存储桶

Rag*_*ala 3 scala amazon-s3 amazon-web-services apache-spark apache-spark-sql

我正在尝试将 DF 数据写入 S3 存储桶。它按预期工作正常。现在我想根据条件写入 s3 存储桶。

\n\n

在数据框中,我有一列作为 Flag ,该列的值为 T 和 F 。现在的条件是如果Flag是F那么它应该将数据写入S3桶否则否。请在下面查看详细信息。

\n\n

DF 数据:

\n\n
1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe5\xb1\xb1\xe5\xbd\xa2,101,SW,10,29,74.35897435897436,11.0,F\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe5\xa4\xa7\xe5\x88\x86,101,SW,14,25,64.1025641025641,15.4,F\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe5\xb1\xb1\xe5\x8f\xa3,101,SW,6,33,84.61538461538461,6.6,T\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe6\x84\x9b\xe5\xaa\x9b,101,SW,5,34,87.17948717948718,5.5,T\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe7\xa5\x9e\xe5\xa5\x88\xe5\xb7\x9d,101,SW,114,75,192.30769230769232,125.4,F\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe5\xaf\x8c\xe5\xb1\xb1,101,SW,12,27,69.23076923076923,13.2,F\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe9\xab\x98\xe7\x9f\xa5,101,SW,3,36,92.3076923076923,3.3,T\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe5\xb2\xa9\xe6\x89\x8b,101,SW,11,28,71.7948717948718,12.1,F\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe4\xb8\x89\xe9\x87\x8d,101,SW,45,6,15.384615384615385,49.5,F\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe4\xba\xac\xe9\x83\xbd,101,SW,23,16,41.02564102564102,25.3,F\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe9\x9d\x99\xe5\xb2\xa1,101,SW,32,7,17.94871794871795,35.2,F\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe9\xb9\xbf\xe5\x85\x90\xe5\xb3\xb6,101,SW,18,21,53.84615384615385,19.8,F\n1015,2017/08,\xe6\x96\xb0\xe6\xbd\x9f,101,SW,39,1015,2017/08,\xe7\xa6\x8f\xe5\xb3\xb6,101,SW,17,22,56.41025641025641,18.7,F\n
Run Code Online (Sandbox Code Playgroud)\n\n

代码 :

\n\n
val df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3a://test_system/transcation.csv")\n    df.createOrReplaceTempView("data")\n    val res = spark.sql("select count(*) from data")\n    res.show(10)\n    res.coalesce(1).write.format("csv").option("header","true").mode("Overwrite")\n     .save("s3a://test_system/Output/Test_Result")\n     res.createOrReplaceTempView("res1")\n     val res2 = spark.sql("select distinct flag from res1 where flag = \'F\'")\n     if (res2 ===\'F\')\n     {\n     //writing to s3 bucket as raw data .Here transcation.csv file.\n     df.write.format("csv").option("header","true").mode("Overwrite")\n     .save("s3a://test_system/Output/Test_Result/rawdata")\n     }\n
Run Code Online (Sandbox Code Playgroud)\n\n

我正在尝试这种方法,但它没有将 df 数据导出到 s3 存储桶。\n如何使用条件将数据导出/写入到 S3 存储桶?

\n\n

非常感谢您的帮助。

\n

use*_*607 5

我假设您想在数据帧中给出“F”标志的情况下写入数据帧。

val df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3a://test_system/transcation.csv")
df.createOrReplaceTempView("data")
val res = spark.sql("select count(*) from data")
res.show(10)
res.coalesce(1).write.format("csv").option("header","true").mode("Overwrite")
  .save("s3a://test_system/Output/Test_Result")
res.createOrReplaceTempView("res1")
Run Code Online (Sandbox Code Playgroud)

这里我们使用data表格,因为res1表格只是您上面创建的计数表。同样从结果数据框中,我们使用first()函数仅选择第一行,并使用该行中的第一列getAs[String](0)

val res2 = spark.sql("select distinct flag from data where flag = 'F'").first().getAs[String](0)

println("Printing out res2 = " + res2)
Run Code Online (Sandbox Code Playgroud)

这里我们正在对上面提取的字符串和 string 进行比较"F"。记住scala 中"F"是字符串,而'F'是字符。

if (res2.equals("F"))
{
  println("Inside the if loop")
  //writing to s3 bucket as raw data .Here transcation.csv file.
  df.write.format("csv").option("header","true").mode("Overwrite")
    .save("s3a://test_system/Output/Test_Result/rawdata")
}
Run Code Online (Sandbox Code Playgroud)

  • 我会避免使用 s3 的覆盖,因为它对每次写入都会进行查询,这使得操作极其密集。 (3认同)