rap*_*l75 5 dataframe python-3.x pandas parquet pyarrow
我对 pandas 和 parquet 文件类型是全新的。我有一个 python 脚本:
然后使用 impala-shell 将 parquet 文件导入回 hdfs。
我遇到的问题似乎与步骤 2 有关。我让它在读入数据帧后以及在步骤 3 中进行任何更改之前立即打印出数据帧的内容。它似乎正在更改数据类型和数据某些字段,当将其写回 parquet 文件时会导致问题。例子:
看来它实际上正在更改这些值,因为当它写入 parquet 文件并将其导入 hdfs 并运行查询时,我收到如下错误:
WARNINGS: File '<path>/test.parquet' has an incompatible Parquet schema for column
'<database>.<table>.tport'. Column type: INT, Parquet schema:
optional double tport [i:1 d:1 r:0]
Run Code Online (Sandbox Code Playgroud)
我不知道为什么它会改变数据而不是保持原样。如果发生这种情况,我不知道是否需要循环每一列并将所有这些替换回其原始值,或者是否有其他方法告诉它不要管它们。
我一直在使用这个参考页面: http://arrow.apache.org/docs/python/parquet.html
它用
pq.read_table(in_file)
Run Code Online (Sandbox Code Playgroud)
读取镶木地板文件,然后
df = table2.to_pandas()
Run Code Online (Sandbox Code Playgroud)
转换为我可以循环并更改列的数据框。我不明白为什么它会改变数据,而且我找不到办法防止这种情况发生。我需要用比 read_table 不同的方式来读取它吗?
如果我查询数据库,数据将如下所示:
端口 |
---|
0 |
1 |
我的同一件事的 print(df) 行如下所示:
端口 |
---|
0.00000 |
南 |
南 |
1.00000 |
这是相关代码。我省略了处理命令行参数的部分,因为它很长并且不适用于这个问题。传入的文件是in_file:
import sys, getopt
import random
import re
import math
import pyarrow.parquet as pq
import numpy as np
import pandas as pd
import pyarrow as pa
import os.path
# <CLI PROCESSING SECTION HERE>
# GET LIST OF COLUMNS THAT MUST BE SCRAMBLED
field_file = open('scrambler_columns.txt', 'r')
contents = field_file.read()
scrambler_columns = contents.split('\n')
def scramble_str(xstr):
#print(xstr + '_scrambled!')
return xstr + '_scrambled!'
parquet_file = pq.ParquetFile(in_file)
table2 = pq.read_table(in_file)
metadata = pq.read_metadata(in_file)
df = table2.to_pandas() #dataframe
print('rows: ' + str(df.shape[0]))
print('cols: ' + str(df.shape[1]))
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', -1)
pd.set_option('display.float_format', lambda x: '%.5f' % x)
#df.fillna(value='', inplace=True) # np.nan # \xa0
print(df) # print before making any changes
cols = list(df)
# https://pythonbasics.org/pandas-iterate-dataframe/
for col_name, col_data in df.iteritems():
#print(cols[index])
if col_name in scrambler_columns:
print('scrambling values in column ' + col_name)
for i, val in col_data.items():
df.at[i, col_name] = scramble_str(str(val))
print(df) # print after making changes
print(parquet_file.num_row_groups)
print(parquet_file.read_row_group(0))
# WRITE NEW PARQUET FILE
new_table = pa.Table.from_pandas(df)
writer = pq.ParquetWriter(out_file, new_table.schema)
for i in range(1):
writer.write_table(new_table)
writer.close()
if os.path.isfile(out_file) == True:
print('wrote ' + out_file)
else:
print('error writing file ' + out_file)
# READ NEW PARQUET FILE
table3 = pq.read_table(out_file)
df = table3.to_pandas() #dataframe
print(df)
Run Code Online (Sandbox Code Playgroud)
以下是 pandas 数据框中的相同内容:
id object
col1 float64
col2 object
col3 object
col4 float64
col5 object
col6 object
col7 object
Run Code Online (Sandbox Code Playgroud)
看来要转换
String to object
Int to float64
bigint to float64
Run Code Online (Sandbox Code Playgroud)
我如何告诉 pandas 列应该是什么数据类型?
编辑 2:我能够通过直接处理 pyarrow 表找到解决方法。请在此处查看我的问题和答案:How to update data in pyarrow table?
在数据库中显示为 NULL 的字段在数据帧的打印输出中被替换为字符串“None”(对于字符串列)或字符串“nan”(对于数字列)。
这是预料之中的。这就是 pandas print 函数的定义方式。
似乎将字符串转换为对象
这也是预料之中的。Numpy/pandas 没有可变长度字符串的数据类型。可以使用固定长度的字符串类型,但这非常不寻常。
似乎将 Int 转换为 float64
这也是预期的,因为该列有空值并且 numpy 的 int64 不可为空。如果你想使用 Pandas 的可为空整数列,你可以这样做......
def lookup(t):
if pa.types.is_integer(t):
return pd.Int64Dtype()
df = table.to_pandas(types_mapper=lookup)
Run Code Online (Sandbox Code Playgroud)
当然,如果您想同时使用 Int32Dtype 和 Int64Dtype,您可以创建更细粒度的查找,这只是一个入门模板。
归档时间: |
|
查看次数: |
12955 次 |
最近记录: |