在将csv文件作为数据框读取时提供模式

Pa1*_*Pa1 35 scala dataframe apache-spark apache-spark-sql spark-csv

我试图将csv文件读入数据帧.我知道我的数据帧的架构应该是什么,因为我知道我的csv文件.另外我使用spark csv包来读取文件.我试图指定如下的架构.

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Run Code Online (Sandbox Code Playgroud)

但是当我检查我创建的数据框架的模式时,它似乎采用了自己的模式.我做错了吗?如何制作火花来接收我提到的架构?

> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

Aru*_*ulu 44

请尝试以下操作,您无需指定架构.当你将inferSchema设为true时,它应该从你的csv文件中获取它.

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Run Code Online (Sandbox Code Playgroud)

如果要手动指定架构,则需要执行以下操作

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("project", StringType, true),
  StructField("article", StringType, true),
  StructField("requests", IntegerType, true),
  StructField("bytes_served", DoubleType, true))
)

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .schema(customSchema)
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Run Code Online (Sandbox Code Playgroud)

  • 理论上,我知道我们可以提及该模式,但我不知道如何在语法方面提及该模式,我可以寻找任何帮助吗?我参考了官方文档,它没有提到这种情况,也没有太多例子 (2认同)

Alb*_*rra 9

我在分析中使用了Arunakiran Nulu提供的解决方案(参见代码).尽管它能够为列分配正确的类型,但返回的所有值都是null.以前,我尝试过该选项.option("inferSchema", "true"),它会在数据框中返回正确的值(尽管类型不同).

val customSchema = StructType(Array(
    StructField("numicu", StringType, true),
    StructField("fecha_solicitud", TimestampType, true),
    StructField("codtecnica", StringType, true),
    StructField("tecnica", StringType, true),
    StructField("finexploracion", TimestampType, true),
    StructField("ultimavalidacioninforme", TimestampType, true),
    StructField("validador", StringType, true)))

val df_explo = spark.read
        .format("csv")
        .option("header", "true")
        .option("delimiter", "\t")
        .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") 
        .schema(customSchema)
        .load(filename)
Run Code Online (Sandbox Code Playgroud)

结果

root


|-- numicu: string (nullable = true)
 |-- fecha_solicitud: timestamp (nullable = true)
 |-- codtecnica: string (nullable = true)
 |-- tecnica: string (nullable = true)
 |-- finexploracion: timestamp (nullable = true)
 |-- ultimavalidacioninforme: timestamp (nullable = true)
 |-- validador: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

并且表格是:

|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador|
+------+---------------+----------+-------+--------------+-----------------------+---------+
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
Run Code Online (Sandbox Code Playgroud)

  • 它看起来像`.option("timestampFormat", "yyyy/mm/dd HH:mm:ss")` 应该改为`.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")` . [注意月份的大写“MM”]否则它会将月份数字解释为时间戳的_分钟_。 (2认同)
  • 如果您的 DateType 列可能包含 'null' 值,请设置 `.option("nullValue", "null")` 否则它会考虑整行具有空值。 (2认同)

use*_*410 9

对于那些有兴趣在Python中执行此操作的人,这里是一个有效的版本。

customSchema = StructType([
    StructField("IDGC", StringType(), True),        
    StructField("SEARCHNAME", StringType(), True),
    StructField("PRICE", DoubleType(), True)
])
productDF = spark.read.load('/home/ForTesting/testProduct.csv', format="csv", header="true", sep='|', schema=customSchema)

testProduct.csv
ID|SEARCHNAME|PRICE
6607|EFKTON75LIN|890.88
6612|EFKTON100HEN|55.66
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助。


X.X*_*X.X 8

感谢@Nulu的回答,它适用于pyspark,只需要很少的调整

from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType

customSchema = StructType(Array(
    StructField("project", StringType, true),
    StructField("article", StringType, true),
    StructField("requests", IntegerType, true),
    StructField("bytes_served", DoubleType, true)))

pagecount = sc.read.format("com.databricks.spark.csv")
         .option("delimiter"," ")
         .option("quote","")
         .option("header", "false")
         .schema(customSchema)
         .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Run Code Online (Sandbox Code Playgroud)

  • 该数组不是 PySpark 类型。您应该使用 Python 数组 [] 而不是“Array”。并且在Python中使用“True”而不是“true”。 (4认同)

ggo*_*don 8

之前的解决方案使用了自定义 StructType。

使用 spark-sql 2.4.5(scala 版本 2.12.10),现在可以使用schema函数将模式指定为字符串

import org.apache.spark.sql.SparkSession;
Run Code Online (Sandbox Code Playgroud)
val sparkSession = SparkSession.builder()
            .appName("sample-app")
            .master("local[2]")
            .getOrCreate();

val pageCount = sparkSession.read
  .format("csv")
  .option("delimiter","|")
  .option("quote","")
  .schema("project string ,article string ,requests integer ,bytes_served long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Run Code Online (Sandbox Code Playgroud)


Sur*_*nti 7

模式定义为简单字符串

以防万一有人对作为带有日期时间戳的简单字符串的模式定义感兴趣

从终端或外壳创建数据文件

echo " 
2019-07-02 22:11:11.000999, 01/01/2019, Suresh, abc  
2019-01-02 22:11:11.000001, 01/01/2020, Aadi, xyz 
" > data.csv
Run Code Online (Sandbox Code Playgroud)

将模式定义为字符串

    user_schema = 'timesta TIMESTAMP,date DATE,first_name STRING , last_name STRING'
Run Code Online (Sandbox Code Playgroud)

读取数据

    df = spark.read.csv(path='data.csv', schema = user_schema, sep=',', dateFormat='MM/dd/yyyy',timestampFormat='yyyy-MM-dd HH:mm:ss.SSSSSS')

    df.show(10, False)

    +-----------------------+----------+----------+---------+
    |timesta                |date      |first_name|last_name|
    +-----------------------+----------+----------+---------+
    |2019-07-02 22:11:11.999|2019-01-01| Suresh   | abc     |
    |2019-01-02 22:11:11.001|2020-01-01| Aadi     | xyz     |
    +-----------------------+----------+----------+---------+
Run Code Online (Sandbox Code Playgroud)

请注意明确定义模式而不是让 spark 推断模式也提高了 spark 读取性能。