如何将csv文件转换为镶木地板

aut*_*243 18 java bigdata parquet

我是BigData的新手.我需要将csv/txt文件转换为Parquet格式.我搜索了很多,但找不到任何直接的方法.有没有办法实现这一目标?

Pra*_*pta 19

[对于 Python ]

Pandas 现在直接支持它。

刚才看了CSV文件导入数据帧通过大熊猫read_csv和使用编写数据帧到拼花文件to_parquet

  • 为什么你会为 Java 问题提供 python 解决方案? (4认同)
  • 因为已经有一个没有提到 to_parquet (因为它是随 0.21.0 一起发布的)。认为这对于需要基于 python 的解决方案的人可能有用。 (2认同)

ost*_*ach 12

您可以使用Apache Drill,如将CSV文件转换为带有Drill的Apache Parquet中所述.

简单来说:

启动Apache Drill:

$ cd /opt/drill/bin
$ sqlline -u jdbc:drill:zk=local

创建Parquet文件:

-- Set default table format to parquet
ALTER SESSION SET `store.format`='parquet';

-- Create a parquet table containing all data from the CSV table
CREATE TABLE dfs.tmp.`/stats/airport_data/` AS
SELECT
CAST(SUBSTR(columns[0],1,4) AS INT)  `YEAR`,
CAST(SUBSTR(columns[0],5,2) AS INT) `MONTH`,
columns[1] as `AIRLINE`,
columns[2] as `IATA_CODE`,
columns[3] as `AIRLINE_2`,
columns[4] as `IATA_CODE_2`,
columns[5] as `GEO_SUMMARY`,
columns[6] as `GEO_REGION`,
columns[7] as `ACTIVITY_CODE`,
columns[8] as `PRICE_CODE`,
columns[9] as `TERMINAL`,
columns[10] as `BOARDING_AREA`,
CAST(columns[11] AS DOUBLE) as `PASSENGER_COUNT`
FROM dfs.`/opendata/Passenger/SFO_Passenger_Data/*.csv`;

尝试从新的Parquet文件中选择数据:

-- Select data from parquet table
SELECT *
FROM dfs.tmp.`/stats/airport_data/*`

您可以dfs.tmp转到http://localhost:8047/storage/dfs(来源:CSV和Parquet)来更改位置.

  • 我确认这是实现这一目标的最佳和最简单的方法.Apache Hive也可以替代. (3认同)

Pra*_*oya 11

是一个代码的示例代码.

  • 为什么不在答案中添加代码? (8认同)
  • 链接中的示例未提供如何定义架构。查看用于将csv转换为镶木地板的pyspark代码只需几行代码即可完成。可以在这里找到http://blogs.quovantis.com/how-to-convert-csv-to-parquet-files/不确定如何在Java中进行操作 (2认同)

ost*_*ach 10

我已经使用Apache Drill 发布有关如何执行此操作的答案.但是,如果您熟悉Python,现在可以使用PandasPyArrow执行此操作!

安装依赖项

使用pip:

pip install pandas pyarrow
Run Code Online (Sandbox Code Playgroud)

或使用conda:

conda install pandas pyarrow -c conda-forge
Run Code Online (Sandbox Code Playgroud)

将CSV转换为Parquet的块

# csv_to_parquet.py

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

csv_file = '/path/to/my.tsv'
parquet_file = '/path/to/my.parquet'
chunksize = 100_000

csv_stream = pd.read_csv(csv_file, sep='\t', chunksize=chunksize, low_memory=False)

for i, chunk in enumerate(csv_stream):
    print("Chunk", i)
    if i == 0:
        # Guess the schema of the CSV file from the first chunk
        parquet_schema = pa.Table.from_pandas(df=chunk).schema
        # Open a Parquet file for writing
        parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
    # Write CSV chunk to the parquet file
    table = pa.Table.from_pandas(chunk, schema=parquet_schema)
    parquet_writer.write_table(table)

parquet_writer.close()
Run Code Online (Sandbox Code Playgroud)

我没有针对Apache Drill版本对此代码进行基准测试,但根据我的经验,它很快,每秒转换成数万行(当然这取决于CSV文件!).


小智 6

以下代码是使用spark2.0的示例。读取比inferSchema选项快得多。Spark 2.0转换为镶木地板文件的效率比spark1.6高得多。

import org.apache.spark.sql.types._
var df = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
df = spark.read
          .schema(df)
          .option("header", "true")
          .option("delimiter", "\t")
          .csv("/user/hduser/wikipedia/pageviews-by-second-tsv")
df.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")
Run Code Online (Sandbox Code Playgroud)


dom*_*nik 6

我制作了一个小型命令行工具来将 CSV 转换为 Parquet:csv2parquet


Mil*_*avi 2

使用Spark-csv 包在 Apache Spark 中将csv 文件读取为Dataframe 。将数据加载到 Dataframe 后,将 dataframe 保存到 parquetfile。

val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .option("mode", "DROPMALFORMED")
      .load("/home/myuser/data/log/*.csv")
df.saveAsParquetFile("/home/myuser/data.parquet")
Run Code Online (Sandbox Code Playgroud)