我正在尝试使用AWS Lambda在S3中加载,处理和编写Parquet文件。我的测试/部署过程是:
似乎有两种可能的方法,它们都在docker容器本地工作:
pyarrow与s3fs:我遵循了https://github.com/apache/arrow/pull/916,当使用lambda函数执行时,我得到了:
OSError: Passed non-file path: s3://mybucket/path/to/myfile中pyarrow / parquet.py,线848局部我得到IndexError: list index out of range在pyarrow / parquet.py,线714OSError: Passed non-file path: s3://mybucket/path/to/myfile在pyarrow / parquet.py的第848行中得到了相同的结果。 我的问题是:
谢谢!
我正在尝试使用Pandas和Pyarrow拼花数据。我有数百个实木复合地板文件,它们不需要具有相同的架构,但是如果各个实木复合地板之间的列匹配,则它们必须具有相同的数据类型。
我遇到的情况是生成的实木复合地板数据类型不是我想要的。例如,我可能将“ an”写入int64一列,并且生成的拼花地板将采用double格式。这会在处理方面造成很多麻烦,因为正确输入了99%的数据,但在1%的情况下,这只是错误的类型。
我试过导入numpy并以这种方式包装值-
import numpy as np
pandas.DataFrame({
'a': [ np.int64(5100), np.int64(5200), np.int64(5300) ]
})
Run Code Online (Sandbox Code Playgroud)
但是我偶尔还是会拿到双,所以这一定是错误的方法。如何确保镶木地板文件中各列的数据类型一致?
更新-
我发现只有在列包含一个或多个Nones 时才会发生这种情况。
data_frame = pandas.DataFrame({
'a': [ None, np.int64(5200), np.int64(5200) ]
})
Run Code Online (Sandbox Code Playgroud)
实木复合地板不能处理混合的None-int64 cols吗?
我正在将大型 CSV 文件转换为 Parquet 文件以供进一步分析。我将 CSV 数据读入 Pandas 并按dtypes如下方式指定列
_dtype = {"column_1": "float64",
"column_2": "category",
"column_3": "int64",
"column_4": "int64"}
df = pd.read_csv("data.csv", dtype=_dtype)
Run Code Online (Sandbox Code Playgroud)
然后我做一些更多的数据清理并将数据写入 Parquet 以供下游使用。
_parquet_kwargs = {"engine": "pyarrow",
"compression": "snappy",
"index": False}
df.to_parquet("data.parquet", **_parquet_kwargs)
Run Code Online (Sandbox Code Playgroud)
但是当我使用 Pandas 将数据读入进行进一步分析时,from_parquet我似乎无法恢复类别 dtypes。下列
df = pd.read_parquet("data.parquet")
Run Code Online (Sandbox Code Playgroud)
结果DataFrame用objectdtypes 代替了所需的category.
以下似乎按预期工作
import pyarrow.parquet as pq
_table = (pq.ParquetFile("data.parquet")
.read(use_pandas_metadata=True))
df = _table.to_pandas(strings_to_categorical=True)
Run Code Online (Sandbox Code Playgroud)
但是我想知道如何使用pd.read_parquet.
我想使用 PyArrow 将以下 Pandas 数据框存储在镶木地板文件中:
import pandas as pd
df = pd.DataFrame({'field': [[{}, {}]]})
Run Code Online (Sandbox Code Playgroud)
field列的类型是字典列表:
field
0 [{}, {}]
Run Code Online (Sandbox Code Playgroud)
我首先定义相应的 PyArrow 架构:
import pyarrow as pa
schema = pa.schema([pa.field('field', pa.list_(pa.struct([])))])
Run Code Online (Sandbox Code Playgroud)
然后我使用from_pandas():
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
Run Code Online (Sandbox Code Playgroud)
这将引发以下异常:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "table.pxi", line 930, in pyarrow.lib.Table.from_pandas
File "/anaconda3/lib/python3.6/site-packages/pyarrow/pandas_compat.py", line 371, in dataframe_to_arrays
convert_types)]
File "/anaconda3/lib/python3.6/site-packages/pyarrow/pandas_compat.py", line 370, in <listcomp>
for c, t in zip(columns_to_convert,
File "/anaconda3/lib/python3.6/site-packages/pyarrow/pandas_compat.py", line 366, in …Run Code Online (Sandbox Code Playgroud) 当我执行以下代码时 - 出现以下错误ValueError: Table schema does not match schema used to create file。
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
fields = [
('one', pa.int64()),
('two', pa.string(), False),
('three', pa.bool_())
]
schema = pa.schema(fields)
schema = schema.remove_metadata()
df = pd.DataFrame(
{
'one': [2, 2, 2],
'two': ['foo', 'bar', 'baz'],
'three': [True, False, True]
}
)
df['two'] = df['two'].astype(str)
table = pa.Table.from_pandas(df, schema, preserve_index=False).replace_schema_metadata()
writer = pq.ParquetWriter('parquest_user_defined_schema.parquet', schema=schema)
writer.write_table(table)
Run Code Online (Sandbox Code Playgroud) 我正在使用 Dask 读取 PySpark 生成的 Parquet 文件,其中一列是字典列表(即array<map<string,string>>')。df 的一个例子是:
import pandas as pd
df = pd.DataFrame.from_records([
(1, [{'job_id': 1, 'started': '2019-07-04'}, {'job_id': 2, 'started': '2019-05-04'}], 100),
(5, [{'job_id': 3, 'started': '2015-06-04'}, {'job_id': 9, 'started': '2019-02-02'}], 540)],
columns=['uid', 'job_history', 'latency']
)
Run Code Online (Sandbox Code Playgroud)
当使用 时engine='fastparquet,Dask 可以很好地读取所有其他列,但会None为具有复杂类型的列返回 s 列。当我设置时engine='pyarrow',出现以下异常:
ArrowNotImplementedError: lists with structs are not supported.
Run Code Online (Sandbox Code Playgroud)
许多谷歌搜索已经明确表明,现在并不真正支持读取带有嵌套数组的列,而且我不完全确定处理此问题的最佳方法是什么。我想我的选择是:
json。该模式很简单,如果可能的话就可以完成这项工作list并注意这些列中的数据通过索引相互关联/映射(例如,0这些键/列中的 idx 中的元素全部来自相同来源)。这会起作用,但坦率地说,让我心碎:(我很想听听其他人如何克服这个限制。我的公司经常在其镶木地板中使用嵌套数组,因此我不想放弃使用 Dask。
我正在学习使用 python 和 pyarrow 的镶木地板文件。Parquet 在压缩和最小化磁盘空间方面非常出色。snappy我的数据集是 190MB csv 文件,当保存为压缩 parquet 文件时,最终会成为单个 3MB 文件。
然而,当我将数据集保存为分区文件时,它们会导致组合大小更大(61MB)。
这是我尝试保存的示例数据集:
listing_id | date | gender | price
-------------------------------------------
a | 2019-01-01 | M | 100
b | 2019-01-02 | M | 100
c | 2019-01-03 | F | 200
d | 2019-01-04 | F | 200
Run Code Online (Sandbox Code Playgroud)
当我按日期(300 多个唯一值)分区时,分区文件的总大小为 61MB。每个文件都有168.2kB大小。当我按性别(2 个唯一值)分区时,分区文件的总大小仅为 3MB。
我想知道镶木地板是否有最小文件大小,这样许多小文件组合起来会消耗更大的磁盘空间?
我的环境:
- OS: Ubuntu 18.04
- Language: Python
- Library: pyarrow, pandas
Run Code Online (Sandbox Code Playgroud)
我的数据集来源:
https://www.kaggle.com/brittabettendorf/berlin-airbnb-data
# I am using calendar_summary.csv …Run Code Online (Sandbox Code Playgroud) 我有下面的代码,它查询大约 500k 行的数据库。当它击中 时,它会抛出一个 SIGKILL rows = cur.fetchall()。我尝试迭代游标而不是将其全部加载到行中,但它似乎仍然会导致 OOM 问题。
无论表的大小如何,如何从数据库中获取所有数据并将其安全地转换为 parquet 文件?
def get_parquet_for_dataset_id(self, dataset, lob, max_dt):
query = _table_query(lob, table_name, max_dt)
conn = self.conns[lob]
with conn:
with conn.cursor(cursor_factory=extras.RealDictCursor) as cur:
cur.execute(query)
rows = cur.fetchall()
table = rows_to_table(rows)
pq_bytes = io.BytesIO()
pq.write_table(table, pq_bytes)
_ = pq_bytes.seek(0)
return pq_bytes;
Run Code Online (Sandbox Code Playgroud) 我有一个 python 脚本,它使用 pyarrow 读取镶木地板文件。我正在尝试循环遍历表以更新其中的值。如果我尝试这个:
for col_name in table2.column_names:
if col_name in my_columns:
print('updating values in column ' + col_name)
col_data = pa.Table.column(table2, col_name)
row_ct = 1
for i in col_data:
pa.Table.column(table2, col_name)[row_ct] = change_str(pa.StringScalar.as_py(i))
row_ct += 1
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
TypeError: 'pyarrow.lib.ChunkedArray' object does not support item assignment
Run Code Online (Sandbox Code Playgroud)
我如何更新这些值?
我尝试使用pandas,但它无法处理原始表中的空值,并且它还错误地转换了原始表中列的数据类型。pyarrow 有本地编辑数据的方法吗?
我在 GCS 中存储了一个 CSV 文件,我想将其加载到 BigQuery 表中。但我需要先进行一些预处理,所以我将其加载到 DataFrame,然后加载到 BigQuery 表
import pandas as pd
import json
from google.cloud import bigquery
cols_name_list = [....]. # column name in order
uri = "gs://<bucket>/<path>/<csv_file>"
df = pd.read_csv(uri, dtype="string")
df =df.reindex(columns=cols_name_list)
client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
... # added all schema field according to table column type
)
job = client.load_table_from_dataframe(
df, "<bq_table_id>", job_config=job_config
)
job.result()
Run Code Online (Sandbox Code Playgroud)
从上面的代码中,我对数据帧列顺序进行了重新排序,以与 BigQuery 表中的顺序相匹配(不确定这是否重要),并将所有列转换为字符串类型。
我收到此错误,如下所示
pyarrow.lib.ArrowInvalid: Could not convert '47803' with type str: tried to convert to int …Run Code Online (Sandbox Code Playgroud) pyarrow ×10
python ×7
parquet ×6
pandas ×4
python-3.x ×3
numpy ×2
amazon-s3 ×1
aws-lambda ×1
dask ×1
fastparquet ×1
psycopg2 ×1
python-3.7 ×1