Pa1*_*Pa1 35 scala dataframe apache-spark apache-spark-sql spark-csv
我试图将csv文件读入数据帧.我知道我的数据帧的架构应该是什么,因为我知道我的csv文件.另外我使用spark csv包来读取文件.我试图指定如下的架构.
val pagecount = sqlContext.read.format("csv")
.option("delimiter"," ").option("quote","")
.option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Run Code Online (Sandbox Code Playgroud)
但是当我检查我创建的数据框架的模式时,它似乎采用了自己的模式.我做错了吗?如何制作火花来接收我提到的架构?
> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
Aru*_*ulu 44
请尝试以下操作,您无需指定架构.当你将inferSchema设为true时,它应该从你的csv文件中获取它.
val pagecount = sqlContext.read.format("csv")
.option("delimiter"," ").option("quote","")
.option("header", "true")
.option("inferSchema", "true")
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Run Code Online (Sandbox Code Playgroud)
如果要手动指定架构,则需要执行以下操作
import org.apache.spark.sql.types._
val customSchema = StructType(Array(
StructField("project", StringType, true),
StructField("article", StringType, true),
StructField("requests", IntegerType, true),
StructField("bytes_served", DoubleType, true))
)
val pagecount = sqlContext.read.format("csv")
.option("delimiter"," ").option("quote","")
.option("header", "true")
.schema(customSchema)
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Run Code Online (Sandbox Code Playgroud)
我在分析中使用了Arunakiran Nulu提供的解决方案(参见代码).尽管它能够为列分配正确的类型,但返回的所有值都是null.以前,我尝试过该选项.option("inferSchema", "true"),它会在数据框中返回正确的值(尽管类型不同).
val customSchema = StructType(Array(
StructField("numicu", StringType, true),
StructField("fecha_solicitud", TimestampType, true),
StructField("codtecnica", StringType, true),
StructField("tecnica", StringType, true),
StructField("finexploracion", TimestampType, true),
StructField("ultimavalidacioninforme", TimestampType, true),
StructField("validador", StringType, true)))
val df_explo = spark.read
.format("csv")
.option("header", "true")
.option("delimiter", "\t")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.schema(customSchema)
.load(filename)
Run Code Online (Sandbox Code Playgroud)
结果
root
|-- numicu: string (nullable = true)
|-- fecha_solicitud: timestamp (nullable = true)
|-- codtecnica: string (nullable = true)
|-- tecnica: string (nullable = true)
|-- finexploracion: timestamp (nullable = true)
|-- ultimavalidacioninforme: timestamp (nullable = true)
|-- validador: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
并且表格是:
|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador|
+------+---------------+----------+-------+--------------+-----------------------+---------+
| null| null| null| null| null| null| null|
| null| null| null| null| null| null| null|
| null| null| null| null| null| null| null|
| null| null| null| null| null| null| null|
Run Code Online (Sandbox Code Playgroud)
对于那些有兴趣在Python中执行此操作的人,这里是一个有效的版本。
customSchema = StructType([
StructField("IDGC", StringType(), True),
StructField("SEARCHNAME", StringType(), True),
StructField("PRICE", DoubleType(), True)
])
productDF = spark.read.load('/home/ForTesting/testProduct.csv', format="csv", header="true", sep='|', schema=customSchema)
testProduct.csv
ID|SEARCHNAME|PRICE
6607|EFKTON75LIN|890.88
6612|EFKTON100HEN|55.66
Run Code Online (Sandbox Code Playgroud)
希望这可以帮助。
感谢@Nulu的回答,它适用于pyspark,只需要很少的调整
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
customSchema = StructType(Array(
StructField("project", StringType, true),
StructField("article", StringType, true),
StructField("requests", IntegerType, true),
StructField("bytes_served", DoubleType, true)))
pagecount = sc.read.format("com.databricks.spark.csv")
.option("delimiter"," ")
.option("quote","")
.option("header", "false")
.schema(customSchema)
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Run Code Online (Sandbox Code Playgroud)
之前的解决方案使用了自定义 StructType。
使用 spark-sql 2.4.5(scala 版本 2.12.10),现在可以使用schema函数将模式指定为字符串
import org.apache.spark.sql.SparkSession;
Run Code Online (Sandbox Code Playgroud)
val sparkSession = SparkSession.builder()
.appName("sample-app")
.master("local[2]")
.getOrCreate();
val pageCount = sparkSession.read
.format("csv")
.option("delimiter","|")
.option("quote","")
.schema("project string ,article string ,requests integer ,bytes_served long")
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Run Code Online (Sandbox Code Playgroud)
模式定义为简单字符串
以防万一有人对作为带有日期和时间戳的简单字符串的模式定义感兴趣
从终端或外壳创建数据文件
echo "
2019-07-02 22:11:11.000999, 01/01/2019, Suresh, abc
2019-01-02 22:11:11.000001, 01/01/2020, Aadi, xyz
" > data.csv
Run Code Online (Sandbox Code Playgroud)
将模式定义为字符串
user_schema = 'timesta TIMESTAMP,date DATE,first_name STRING , last_name STRING'
Run Code Online (Sandbox Code Playgroud)
读取数据
df = spark.read.csv(path='data.csv', schema = user_schema, sep=',', dateFormat='MM/dd/yyyy',timestampFormat='yyyy-MM-dd HH:mm:ss.SSSSSS')
df.show(10, False)
+-----------------------+----------+----------+---------+
|timesta |date |first_name|last_name|
+-----------------------+----------+----------+---------+
|2019-07-02 22:11:11.999|2019-01-01| Suresh | abc |
|2019-01-02 22:11:11.001|2020-01-01| Aadi | xyz |
+-----------------------+----------+----------+---------+
Run Code Online (Sandbox Code Playgroud)
请注意明确定义模式而不是让 spark 推断模式也提高了 spark 读取性能。
| 归档时间: |
|
| 查看次数: |
70076 次 |
| 最近记录: |