使用python将csv转换为镶木地板文件

inq*_*mer 19 python csv parquet

我想将.csv文件转换为.parquet文件.
csv文件(Temp.csv)具有以下格式

1,Jon,Doe,Denver
Run Code Online (Sandbox Code Playgroud)

我使用以下python代码将其转换为镶木地板

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import os

if __name__ == "__main__":
    sc = SparkContext(appName="CSV2Parquet")
    sqlContext = SQLContext(sc)

    schema = StructType([
            StructField("col1", IntegerType(), True),
            StructField("col2", StringType(), True),
            StructField("col3", StringType(), True),
            StructField("col4", StringType(), True)])
    dirname = os.path.dirname(os.path.abspath(__file__))
    csvfilename = os.path.join(dirname,'Temp.csv')    
    rdd = sc.textFile(csvfilename).map(lambda line: line.split(","))
    df = sqlContext.createDataFrame(rdd, schema)
    parquetfilename = os.path.join(dirname,'output.parquet')    
    df.write.mode('overwrite').parquet(parquetfilename)
Run Code Online (Sandbox Code Playgroud)

结果只是一个名为的文件夹,output.parquet而不是我正在寻找的镶木地板文件,然后在控制台上出现以下错误.

CSV到Parquet错误

我也尝试运行以下代码来面对类似的问题.

from pyspark.sql import SparkSession
import os

spark = SparkSession \
    .builder \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# read csv
dirname = os.path.dirname(os.path.abspath(__file__))
csvfilename = os.path.join(dirname,'Temp.csv')    
df = spark.read.csv(csvfilename)

# Displays the content of the DataFrame to stdout
df.show()
parquetfilename = os.path.join(dirname,'output.parquet')    
df.write.mode('overwrite').parquet(parquetfilename)
Run Code Online (Sandbox Code Playgroud)

怎么做到最好?使用windows,python 2.7.

Uwe*_*orn 20

使用这些包pyarrow,pandas您可以在不使用后台JVM的情况下将CSV转换为Parquet:

import pandas as pd
df = pd.read_csv('example.csv')
df.to_parquet('output.parquet')
Run Code Online (Sandbox Code Playgroud)

您将运行的一个限制pyarrow是仅适用于Windows上的Python 3.5+.使用Linux/OSX以Python 2运行代码或将Windows安装程序升级到Python 3.6.

  • 这是将一个文件转换为一个镶木地板文件的非常简单的方法,但是如果我们有多个csv文件并且想要将其解析为一个镶木地板文件怎么办? (2认同)
  • @Zombraz,您可以循环遍历文件并将每个文件转换为镶木地板,如果您正在寻找Python之外的任何内容,AWS EMR上的hive可以很好地将csv转换为镶木地板 (2认同)

tar*_*ras 14

您可以仅使用 pyarrow 将 csv 转换为 parquet - 不使用 pandas。当您需要最大限度地减少代码依赖性(例如使用 AWS Lambda)时,它可能会很有用。

import pyarrow.csv as pv
import pyarrow.parquet as pq

table = pv.read_csv(filename)
pq.write_table(table, filename.replace('csv', 'parquet'))
Run Code Online (Sandbox Code Playgroud)

请参阅 pyarrow 文档进行微调read_csvwrite_table功能。


小智 13

import boto3
import pandas as pd
import pyarrow as pa
from s3fs import S3FileSystem
import pyarrow.parquet as pq

s3 = boto3.client('s3',region_name='us-east-2')
obj = s3.get_object(Bucket='ssiworkoutput', Key='file_Folder/File_Name.csv')
df = pd.read_csv(obj['Body'])

table = pa.Table.from_pandas(df)

output_file = "s3://ssiworkoutput/file/output.parquet"  # S3 Path need to mention
s3 = S3FileSystem()

pq.write_to_dataset(table=table,
                    root_path=output_file,partition_cols=['Year','Month'],
                    filesystem=s3)

print("File converted from CSV to parquet completed")
Run Code Online (Sandbox Code Playgroud)

  • 如何在 aws 上安装 pyarrow 而不会出现包大小问题? (2认同)

Pow*_*ers 9

有几种不同的方法可以使用 Python 将 CSV 文件转换为 Parquet。

Uwe L. Korn 的 Pandas 方法非常有效。

如果您想将多个 CSV 文件转换为多个 Parquet/单个 Parquet 文件,请使用 Dask。这会将多个 CSV 文件转换为两个 Parquet 文件:

import dask.dataframe as dd

df = dd.read_csv('./data/people/*.csv')
df = df.repartition(npartitions=4)
df.to_parquet('./tmp/people_parquet4')
Run Code Online (Sandbox Code Playgroud)

df.repartition(npartitions=1)如果您只想输出一个 Parquet 文件,也可以使用。有关使用 Dask 将 CSV 转换为 Parquet 的更多信息 [此处][1]。

这是在 Spark 环境中工作的 PySpark 片段:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master("local") \
  .appName("parquet_example") \
  .getOrCreate()

df = spark.read.csv('data/us_presidents.csv', header = True)
df.repartition(1).write.mode('overwrite').parquet('tmp/pyspark_us_presidents')
Run Code Online (Sandbox Code Playgroud)

您还可以在 Spark 环境中使用Koalas

import databricks.koalas as ks

df = ks.read_csv('data/us_presidents.csv')
df.to_parquet('tmp/koala_us_presidents')
Run Code Online (Sandbox Code Playgroud)


sha*_*359 8

处理大于内存的 CSV 文件

下面的代码将 CSV 转换为 Parquet,而不将整个 csv 文件加载到内存中

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

new_schema = pa.schema([
    ('col1', pa.int64()),
    ('col2', pa.int64()),
    ('newcol', pa.int64())
])

csv_column_list = ['col1', 'col2']

with pq.ParquetWriter('my_parq_data.parquet', schema=new_schema) as writer:
    with pd.read_csv('my_data.csv', header=None, names=csv_column_list, chunksize=100000) as reader:
        for df in reader:
            # transformation: transform df by adding a new static column with column name 'newcol' and value 9999999
            df['newcol'] = 9999999
            # convert pandas df to record batch
            transformed_batch = pa.RecordBatch.from_pandas(df, schema=new_schema)
            writer.write_batch(transformed_batch)  
Run Code Online (Sandbox Code Playgroud)

上面的代码:

  1. 分块读取大型CSV 文件。
  2. 通过添加新列来转换数据框。
  3. 将 df 转换为箭头记录批次
  4. 将转换后的箭头批次作为新行组写入 parquet 文件。

注意:不要将块大小保持得太小。这将导致压缩效果不佳,因为块大小也对应于新镶木地板文件中的行组大小。