使用 spark databricks 平台从 URL 读取数据

S12*_*DB8 6 scala apache-spark apache-spark-sql pyspark databricks

尝试在 databricks 社区版平台上使用 spark 从 url 读取数据我尝试使用 spark.read.csv 并使用 SparkFiles 但仍然缺少一些简单的点

url = "https://raw.githubusercontent.com/thomaspernet/data_csv_r/master/data/adult.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
# sc.addFile(url)
# sqlContext = SQLContext(sc)
# df = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= True) 

df = spark.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= True)
Run Code Online (Sandbox Code Playgroud)

得到路径相关的错误:

Path does not exist: dbfs:/local_disk0/spark-9f23ed57-133e-41d5-91b2-12555d641961/userFiles-d252b3ba-499c-42c9-be48-96358357fb75/adult.csv;'

我也尝试过其他方式

val content = scala.io.Source.fromURL("https://raw.githubusercontent.com/thomaspernet/data_csv_r/master/data/adult.csv").mkString

 # val list = content.split("\n").filter(_ != "")
   val rdd = sc.parallelize(content)
   val df = rdd.toDF

SyntaxError: invalid syntax
  File "<command-332010883169993>", line 16
    val content = scala.io.Source.fromURL("https://raw.githubusercontent.com/thomaspernet/data_csv_r/master/data/adult.csv").mkString
              ^
SyntaxError: invalid syntax
Run Code Online (Sandbox Code Playgroud)

数据应该直接加载到 databricks 文件夹,或者我应该能够使用 spark.read 直接从 url 加载,任何建议

vik*_*ana 13

尝试这个。

url = "https://raw.githubusercontent.com/thomaspernet/data_csv_r/master/data/adult.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)

**df = spark.read.csv("file://"+SparkFiles.get("adult.csv"), header=True, inferSchema= True)**
Run Code Online (Sandbox Code Playgroud)

只需获取您的 csv url 的几列。

df.select("age","workclass","fnlwgt","education").show(10);
>>> df.select("age","workclass","fnlwgt","education").show(10);
+---+----------------+------+---------+
|age|       workclass|fnlwgt|education|
+---+----------------+------+---------+
| 39|       State-gov| 77516|Bachelors|
| 50|Self-emp-not-inc| 83311|Bachelors|
| 38|         Private|215646|  HS-grad|
| 53|         Private|234721|     11th|
| 28|         Private|338409|Bachelors|
| 37|         Private|284582|  Masters|
| 49|         Private|160187|      9th|
| 52|Self-emp-not-inc|209642|  HS-grad|
| 31|         Private| 45781|  Masters|
| 42|         Private|159449|Bachelors|
+---+----------------+------+---------+
Run Code Online (Sandbox Code Playgroud)

SparkFiles 获取驱动程序或工作器本地文件的绝对路径。这就是它无法找到它的原因。


Ram*_*ram 3

上面的答案有效,但有时可能容易出错 SparkFiles.get 将返回 null

#1 是从任何 url 或公共 s3 位置获取文件的更重要的方式


选项1 :

IOUtils.toString 会做到这一点,请参阅 apache commons io jar 的文档将已经存在于任何 Spark 集群中,无论是其 databricks 还是任何其他 Spark 安装。

下面是执行此操作的 scala 方法...我为此示例采用了原始 git hub csv 文件...可以根据要求进行更改。

import org.apache.commons.io.IOUtils // jar will be already there in spark cluster no need to worry
import java.net.URL 

val urlfile=new URL("https://raw.githubusercontent.com/lrjoshi/webpage/master/public/post/c159s.csv")
  val testcsvgit = IOUtils.toString(urlfile,"UTF-8").lines.toList.toDS()
  val testcsv = spark
                .read.option("header", true)
                .option("inferSchema", true)
                .csv(testcsvgit)
  testcsv.show
Run Code Online (Sandbox Code Playgroud)

结果 :

+-----------+------+----+----+---+-----+
|Experiment |Virus |Cell| MOI|hpi|Titer|
+-----------+------+----+----+---+-----+
|      EXP I| C159S|OFTu| 0.1|  0| 4.75|
|      EXP I| C159S|OFTu| 0.1|  6| 2.75|
|      EXP I| C159S|OFTu| 0.1| 12| 2.75|
|      EXP I| C159S|OFTu| 0.1| 24|  5.0|
|      EXP I| C159S|OFTu| 0.1| 48|  5.5|
|      EXP I| C159S|OFTu| 0.1| 72|  7.0|
|      EXP I| C159S| STU| 0.1|  0| 4.75|
|      EXP I| C159S| STU| 0.1|  6| 3.75|
|      EXP I| C159S| STU| 0.1| 12|  4.0|
|      EXP I| C159S| STU| 0.1| 24| 3.75|
|      EXP I| C159S| STU| 0.1| 48| 3.25|
|      EXP I| C159S| STU| 0.1| 72| 3.25|
|      EXP I| C159S|OFTu|10.0|  0|  6.5|
|      EXP I| C159S|OFTu|10.0|  6| 4.75|
|      EXP I| C159S|OFTu|10.0| 12| 4.75|
|      EXP I| C159S|OFTu|10.0| 24| 6.25|
|      EXP I| C159S|OFTu|10.0| 48|  6.5|
|      EXP I| C159S|OFTu|10.0| 72|  7.0|
|      EXP I| C159S| STU|10.0|  0|  7.0|
|      EXP I| C159S| STU|10.0|  6| 4.75|
+-----------+------+----+----+---+-----+
only showing top 20 rows
Run Code Online (Sandbox Code Playgroud)

选项 2:在 Scala 中

import java.net.URL
import org.apache.spark.SparkFiles
val urlfile="https://raw.githubusercontent.com/lrjoshi/webpage/master/public/post/c159s.csv"
spark.sparkContext.addFile(urlfile)

val df = spark.read
.option("inferSchema", true)
.option("header", true)
.csv("file://"+SparkFiles.get("c159s.csv"))
df.show
Run Code Online (Sandbox Code Playgroud)

结果:与选项 #1 相同,如下所示

+-----------+------+----+----+---+-----+
|Experiment |Virus |Cell| MOI|hpi|Titer|
+-----------+------+----+----+---+-----+
|      EXP I| C159S|OFTu| 0.1|  0| 4.75|
|      EXP I| C159S|OFTu| 0.1|  6| 2.75|
|      EXP I| C159S|OFTu| 0.1| 12| 2.75|
|      EXP I| C159S|OFTu| 0.1| 24|  5.0|
|      EXP I| C159S|OFTu| 0.1| 48|  5.5|
|      EXP I| C159S|OFTu| 0.1| 72|  7.0|
|      EXP I| C159S| STU| 0.1|  0| 4.75|
|      EXP I| C159S| STU| 0.1|  6| 3.75|
|      EXP I| C159S| STU| 0.1| 12|  4.0|
|      EXP I| C159S| STU| 0.1| 24| 3.75|
|      EXP I| C159S| STU| 0.1| 48| 3.25|
|      EXP I| C159S| STU| 0.1| 72| 3.25|
|      EXP I| C159S|OFTu|10.0|  0|  6.5|
|      EXP I| C159S|OFTu|10.0|  6| 4.75|
|      EXP I| C159S|OFTu|10.0| 12| 4.75|
|      EXP I| C159S|OFTu|10.0| 24| 6.25|
|      EXP I| C159S|OFTu|10.0| 48|  6.5|
|      EXP I| C159S|OFTu|10.0| 72|  7.0|
|      EXP I| C159S| STU|10.0|  0|  7.0|
|      EXP I| C159S| STU|10.0|  6| 4.75|
+-----------+------+----+----+---+-----+
only showing top 20 rows

Run Code Online (Sandbox Code Playgroud)

如果您使用的是 Windows,.csv("file:///则应使用//

选项 3:使用httpclient 库

import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadFromUrl {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ReadFromUrl")
      .master("local[*]")
      .getOrCreate()

    val url = "https://people.sc.fsu.edu/~jburkardt/data/csv/airtravel.csv"
    val httpClient = HttpClients.createDefault()
    val httpGet = new HttpGet(url)
    val response = httpClient.execute(httpGet)
    val entity = response.getEntity
    val data = EntityUtils.toString(entity)

    val tempFile = java.io.File.createTempFile("airtravel", ".csv")
    tempFile.deleteOnExit()
    val writer = new java.io.PrintWriter(tempFile)
    writer.write(data)
    writer.close()

    val df: DataFrame = spark.read
      .format("csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(tempFile.getAbsolutePath)

    df.show()
    
    spark.stop()
  }
}

Run Code Online (Sandbox Code Playgroud)

结果 :

+-----+-------+-------+-------+
|Month| "1958"| "1959"| "1960"|
+-----+-------+-------+-------+
|  JAN|  340.0|  360.0|  417.0|
|  FEB|  318.0|  342.0|  391.0|
|  MAR|  362.0|  406.0|  419.0|
|  APR|  348.0|  396.0|  461.0|
|  MAY|  363.0|  420.0|  472.0|
|  JUN|  435.0|  472.0|  535.0|
|  JUL|  491.0|  548.0|  622.0|
|  AUG|  505.0|  559.0|  606.0|
|  SEP|  404.0|  463.0|  508.0|
|  OCT|  359.0|  407.0|  461.0|
|  NOV|  310.0|  362.0|  390.0|
|  DEC|  337.0|  405.0|  432.0|
+-----+-------+-------+-------+
Run Code Online (Sandbox Code Playgroud)