标签: pyarrow

使用AWS Lambda(Python 3)读取存储在S3中的Parquet文件

我正在尝试使用AWS Lambda在S3中加载,处理和编写Parquet文件。我的测试/部署过程是:

似乎有两种可能的方法,它们都在docker容器本地工作

  1. 使用s3fs的fastparquet:不幸的是,该软件包的未压缩大小大于256MB,因此我无法使用它更新Lambda代码。
  2. pyarrow与s3fs:我遵循了https://github.com/apache/arrow/pull/916,当使用lambda函数执行时,我得到了:

    • 如果我前缀S3或S3N的URI(如在代码示例):在lambda环境OSError: Passed non-file path: s3://mybucket/path/to/myfile中pyarrow / parquet.py,线848局部我得到IndexError: list index out of range在pyarrow / parquet.py,线714
    • 如果我不使用S3或S3N作为URI的前缀:它可以在本地工作(我可以读取镶木地板数据)。在Lambda环境中,我OSError: Passed non-file path: s3://mybucket/path/to/myfile在pyarrow / parquet.py的第848行中得到了相同的结果。

我的问题是:

  • 为什么在Docker容器中得到的结果与在Lambda环境中得到的结果不同?
  • 给出URI的正确方法是什么?
  • 是否可以通过AWS Lambda读取S3中的Parquet文件?

谢谢!

python amazon-s3 parquet aws-lambda pyarrow

6
推荐指数
2
解决办法
4389
查看次数

Pandas Dataframe Parquet数据类型?

我正在尝试使用Pandas和Pyarrow拼花数据。我有数百个实木复合地板文件,它们不需要具有相同的架构,但是如果各个实木复合地板之间的列匹配,则它们必须具有相同的数据类型。

我遇到的情况是生成的实木复合地板数据类型不是我想要的。例如,我可能将“ an”写入int64一列,并且生成的拼花地板将采用double格式。这会在处理方面造成很多麻烦,因为正确输入了99%的数据,但在1%的情况下,这只是错误的类型。

我试过导入numpy并以这种方式包装值-

import numpy as np

pandas.DataFrame({
  'a': [ np.int64(5100), np.int64(5200), np.int64(5300) ]
})
Run Code Online (Sandbox Code Playgroud)

但是我偶尔还是会拿到双,所以这一定是错误的方法。如何确保镶木地板文件中各列的数据类型一致?

更新-

我发现只有在列包含一个或多个Nones 时才会发生这种情况。

data_frame = pandas.DataFrame({
  'a': [ None, np.int64(5200), np.int64(5200) ]
})
Run Code Online (Sandbox Code Playgroud)

实木复合地板不能处理混合的None-int64 cols吗?

python numpy pandas parquet pyarrow

6
推荐指数
1
解决办法
1423
查看次数

使用 read_parquet 的 Parquet 文件中带有分类列的 Pandas DataFrame?

我正在将大型 CSV 文件转换为 Parquet 文件以供进一步分析。我将 CSV 数据读入 Pandas 并按dtypes如下方式指定列

_dtype = {"column_1": "float64",
          "column_2": "category",
          "column_3": "int64",
          "column_4": "int64"}

df = pd.read_csv("data.csv", dtype=_dtype)
Run Code Online (Sandbox Code Playgroud)

然后我做一些更多的数据清理并将数据写入 Parquet 以供下游使用。

_parquet_kwargs = {"engine": "pyarrow",
                   "compression": "snappy",
                   "index": False}

df.to_parquet("data.parquet", **_parquet_kwargs)
Run Code Online (Sandbox Code Playgroud)

但是当我使用 Pandas 将数据读入进行进一步分析时,from_parquet我似乎无法恢复类别 dtypes。下列

df = pd.read_parquet("data.parquet")
Run Code Online (Sandbox Code Playgroud)

结果DataFrameobjectdtypes 代替了所需的category.

以下似乎按预期工作

import pyarrow.parquet as pq

_table = (pq.ParquetFile("data.parquet")
            .read(use_pandas_metadata=True))

df = _table.to_pandas(strings_to_categorical=True)
Run Code Online (Sandbox Code Playgroud)

但是我想知道如何使用pd.read_parquet.

python-3.x pandas parquet pyarrow

6
推荐指数
1
解决办法
4680
查看次数

PyArrow:使用嵌套类型在镶木地板中存储字典列表

我想使用 PyArrow 将以下 Pandas 数据框存储在镶木地板文件中:

import pandas as pd
df = pd.DataFrame({'field': [[{}, {}]]})
Run Code Online (Sandbox Code Playgroud)

field列的类型是字典列表:

      field
0  [{}, {}]

Run Code Online (Sandbox Code Playgroud)

我首先定义相应的 PyArrow 架构:

import pyarrow as pa
schema = pa.schema([pa.field('field', pa.list_(pa.struct([])))])
Run Code Online (Sandbox Code Playgroud)

然后我使用from_pandas()

table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
Run Code Online (Sandbox Code Playgroud)

这将引发以下异常:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "table.pxi", line 930, in pyarrow.lib.Table.from_pandas
  File "/anaconda3/lib/python3.6/site-packages/pyarrow/pandas_compat.py", line 371, in dataframe_to_arrays
    convert_types)]
  File "/anaconda3/lib/python3.6/site-packages/pyarrow/pandas_compat.py", line 370, in <listcomp>
    for c, t in zip(columns_to_convert,
  File "/anaconda3/lib/python3.6/site-packages/pyarrow/pandas_compat.py", line 366, in …
Run Code Online (Sandbox Code Playgroud)

python pandas parquet pyarrow

6
推荐指数
1
解决办法
3889
查看次数

如何通过 pyarrow 使用用户定义的模式编写 Parquet

当我执行以下代码时 - 出现以下错误ValueError: Table schema does not match schema used to create file

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


fields = [
    ('one', pa.int64()),
    ('two', pa.string(), False),
    ('three', pa.bool_())
]
schema = pa.schema(fields)

schema = schema.remove_metadata()
df = pd.DataFrame(
    {
        'one': [2, 2, 2],
        'two': ['foo', 'bar', 'baz'],
        'three': [True, False, True]
    }
)

df['two'] = df['two'].astype(str)

table = pa.Table.from_pandas(df, schema, preserve_index=False).replace_schema_metadata()
writer = pq.ParquetWriter('parquest_user_defined_schema.parquet', schema=schema)
writer.write_table(table)
Run Code Online (Sandbox Code Playgroud)

python-3.x pyarrow

6
推荐指数
1
解决办法
9773
查看次数

使用 Array&lt;Map&lt;String,String&gt;&gt; 列读取 Parquet 文件

我正在使用 Dask 读取 PySpark 生成的 Parquet 文件,其中一列是字典列表(即array<map<string,string>>')。df 的一个例子是:

import pandas as pd

df = pd.DataFrame.from_records([ 
    (1, [{'job_id': 1, 'started': '2019-07-04'}, {'job_id': 2, 'started': '2019-05-04'}], 100), 
    (5, [{'job_id': 3, 'started': '2015-06-04'}, {'job_id': 9, 'started': '2019-02-02'}], 540)], 
    columns=['uid', 'job_history', 'latency'] 
) 
Run Code Online (Sandbox Code Playgroud)

当使用 时engine='fastparquet,Dask 可以很好地读取所有其他列,但会None为具有复杂类型的列返回 s 列。当我设置时engine='pyarrow',出现以下异常:

ArrowNotImplementedError: lists with structs are not supported.
Run Code Online (Sandbox Code Playgroud)

许多谷歌搜索已经明确表明,现在并不真正支持读取带有嵌套数组的列,而且我不完全确定处理此问题的最佳方法是什么。我想我的选择是:

  • 一些如何告诉 dask/fastparquet 使用标准库解析列json。该模式很简单,如果可能的话就可以完成这项工作
  • 看看我是否可以重新运行生成输出的 Spark 作业并将其另存为其他内容,尽管这几乎不是一个可接受的解决方案,因为我的公司到处都使用镶木地板
  • 将映射的键转换为列,并使用 dtype 将数据分解为多个列,list并注意这些列中的数据通过索引相互关联/映射(例如,0这些键/列中的 idx 中的元素全部来自相同来源)。这会起作用,但坦率地说,让我心碎:(

我很想听听其他人如何克服这个限制。我的公司经常在其镶木地板中使用嵌套数组,因此我不想放弃使用 Dask。

python dask python-3.7 fastparquet pyarrow

6
推荐指数
1
解决办法
1万
查看次数

为什么分区 parquet 文件会占用更大的磁盘空间?

我正在学习使用 python 和 pyarrow 的镶木地板文件。Parquet 在压缩和最小化磁盘空间方面非常出色。snappy我的数据集是 190MB csv 文件,当保存为压缩 parquet 文件时,最终会成为单个 3MB 文件。

然而,当我将数据集保存为分区文件时,它们会导致组合大小更大(61MB)。

这是我尝试保存的示例数据集:

listing_id |     date     | gender | price
-------------------------------------------
     a     |  2019-01-01  |   M    |   100
     b     |  2019-01-02  |   M    |   100
     c     |  2019-01-03  |   F    |   200
     d     |  2019-01-04  |   F    |   200

Run Code Online (Sandbox Code Playgroud)

当我按日期(300 多个唯一值)分区时,分区文件的总大小为 61MB。每个文件都有168.2kB大小。当我按性别(2 个唯一值)分区时,分区文件的总大小仅为 3MB。

我想知道镶木地板是否有最小文件大小,这样许多小文件组合起来会消耗更大的磁盘空间?

我的环境:

- OS: Ubuntu 18.04
- Language: Python
- Library: pyarrow, pandas
Run Code Online (Sandbox Code Playgroud)

我的数据集来源:

https://www.kaggle.com/brittabettendorf/berlin-airbnb-data

# I am using calendar_summary.csv …
Run Code Online (Sandbox Code Playgroud)

python parquet pyarrow

6
推荐指数
1
解决办法
2239
查看次数

如何将超出内存容量的数据从 PostgreSQL 查询流式传输到 parquet 文件?

我有下面的代码,它查询大约 500k 行的数据库。当它击中 时,它会抛出一个 SIGKILL rows = cur.fetchall()。我尝试迭代游标而不是将其全部加载到行中,但它似乎仍然会导致 OOM 问题。

无论表的大小如何,如何从数据库中获取所有数据并将其安全地转换为 parquet 文件?

def get_parquet_for_dataset_id(self, dataset, lob, max_dt):
        query = _table_query(lob, table_name, max_dt)
        conn = self.conns[lob]

        with conn:
            with conn.cursor(cursor_factory=extras.RealDictCursor) as cur:
                cur.execute(query)

                rows = cur.fetchall()

                table = rows_to_table(rows)
                pq_bytes = io.BytesIO()
                pq.write_table(table, pq_bytes)
                _ = pq_bytes.seek(0)

                return pq_bytes;

Run Code Online (Sandbox Code Playgroud)

python psycopg2 parquet pyarrow

6
推荐指数
1
解决办法
2021
查看次数

如何更新pyarrow表中的数据?

我有一个 python 脚本,它使用 pyarrow 读取镶木地板文件。我正在尝试循环遍历表以更新其中的值。如果我尝试这个:

for col_name in table2.column_names:
    if col_name in my_columns:
        print('updating values in column '  + col_name)
        
        col_data = pa.Table.column(table2, col_name)
        
        row_ct = 1
        for i in col_data:
            pa.Table.column(table2, col_name)[row_ct] = change_str(pa.StringScalar.as_py(i))
            row_ct += 1
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

 TypeError: 'pyarrow.lib.ChunkedArray' object does not support item assignment
Run Code Online (Sandbox Code Playgroud)

我如何更新这些值?

我尝试使用pandas,但它无法处理原始表中的空值,并且它还错误地转换了原始表中列的数据类型。pyarrow 有本地编辑数据的方法吗?

python-3.x pyarrow

6
推荐指数
1
解决办法
1万
查看次数

将 DataFrame 加载到 BigQuery 表时出错(pyarrow.lib.ArrowTypeError:&lt;class 'str'&gt; 类型的对象无法转换为 int)

我在 GCS 中存储了一个 CSV 文件,我想将其加载到 BigQuery 表中。但我需要先进行一些预处理,所以我将其加载到 DataFrame,然后加载到 BigQuery 表

import pandas as pd
import json 
from google.cloud import bigquery


cols_name_list = [....]. # column name in order
uri = "gs://<bucket>/<path>/<csv_file>"
df = pd.read_csv(uri, dtype="string")
df =df.reindex(columns=cols_name_list)

client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
   ... # added all schema field according to table column type
)

job = client.load_table_from_dataframe(
    df, "<bq_table_id>", job_config=job_config
)
job.result()
Run Code Online (Sandbox Code Playgroud)

从上面的代码中,我对数据帧列顺序进行了重新排序,以与 BigQuery 表中的顺序相匹配(不确定这是否重要),并将所有列转换为字符串类型。

我收到此错误,如下所示

pyarrow.lib.ArrowInvalid: Could not convert '47803' with type str: tried to convert to int …
Run Code Online (Sandbox Code Playgroud)

python numpy pandas google-bigquery pyarrow

6
推荐指数
1
解决办法
2万
查看次数