标签: pyarrow

用大绳子装载镶木地板

我正在处理(使用 pyspark)可能包含大字符串值的数据帧。简单来说,该列包含来自 API 的响应,该响应是大小最大为 20 MB 的字符串。通常该字符串小于 10 MB。我注意到,当字符串大小达到一定大小(大约 16 MB)时,读取这些 parquet 文件非常慢并且消耗大量内存(我的 Spark 集群会抛出 OOM)。对于 Spark 和 pyarrow 都是如此,但对于 pandas 则不然(我猜是因为pyarrow 的 cpp 实现)。

有什么办法可以让它更快。有人可以证实我的假设,即这与 java 中处理大字符串的内存分配有关吗?

以下是 parquet-inspect 的输出:

############ file meta data ############
created_by: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
num_columns: 1
num_rows: 100
num_row_groups: 1
format_version: 1.0
serialized_size: 400


############ Columns ############
api_response

############ Column(ocr_response) ############
name: ocr_response
path: ocr_response
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY (space_saved: 82%)
Run Code Online (Sandbox Code Playgroud)

java apache-spark parquet pyspark pyarrow

5
推荐指数
0
解决办法
201
查看次数

如何使用 Pandas 编写分区的 Parquet 文件

我正在尝试将 Pandas 数据帧写入分区文件:

df.to_parquet('output.parquet', engine='pyarrow', partition_cols = ['partone', 'partwo'])

TypeError: __cinit__() got an unexpected keyword argument 'partition_cols'
Run Code Online (Sandbox Code Playgroud)

从文档中我预计partition_cols将作为 kwargs 传递给 pyarrow 库。如何使用 Pandas 将分区文件写入本地磁盘?

python pandas parquet pyarrow

4
推荐指数
3
解决办法
1万
查看次数

使用 pyarrow 在 worker 之间共享对象

我想将共享 DataFrame 的只读访问权限授予由multiprocessing.Pool.map().

我想避免复制和酸洗。

我知道可以使用 pyarrow 。但是,我发现他们的文档非常繁琐。任何人都可以提供一个关于如何完成的例子吗?

python pandas python-multiprocessing pyarrow

4
推荐指数
1
解决办法
2956
查看次数

使用 delta 编码 coulmns 编写 parquet 文件

我试图用增量编码编写镶木地板文件。 此页面指出 parquet 支持三种类型的 delta 编码:

    (DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY).
Run Code Online (Sandbox Code Playgroud)

由于spark或不允许我们指定编码方法,我很好奇如何编写启用增量编码的文件pysparkpyarrow

但是,我在互联网上发现,如果我有TimeStamp镶木地板类型的列,将使用增量编码。所以我使用以下代码来scala创建镶木地板文件。但编码不是增量。


    val df = Seq(("2018-05-01"),
                ("2018-05-02"),
                ("2018-05-03"),
                ("2018-05-04"),
                ("2018-05-05"),
                ("2018-05-06"),
                ("2018-05-07"),
                ("2018-05-08"),
                ("2018-05-09"),
                ("2018-05-10")
            ).toDF("Id")
    val df2 = df.withColumn("Timestamp", (col("Id").cast("timestamp")))
    val df3 = df2.withColumn("Date", (col("Id").cast("date")))

    df3.coalesce(1).write.format("parquet").mode("append").save("date_time2")
Run Code Online (Sandbox Code Playgroud)

parquet-tools显示有关写入的 parquet 文件的以下信息。

file schema: spark_schema 
--------------------------------------------------------------------------------
Id:          OPTIONAL BINARY L:STRING R:0 D:1
Timestamp:   OPTIONAL INT96 R:0 D:1
Date:        OPTIONAL INT32 L:DATE R:0 D:1

row group 1: RC:31 TS:1100 OFFSET:4 
--------------------------------------------------------------------------------
Id:           BINARY …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark parquet pyspark pyarrow

4
推荐指数
1
解决办法
3496
查看次数

Google BigQuery 架构与使用 load_table_from_dataframe 的数字数据类型发生冲突(pyarrow 错误)

当我将数字数据(int64 或 float64)从 Pandas 数据框上传到“数字” Google BigQuery 数据类型时,出现以下错误:

pyarrow.lib.ArrowInvalid:获得长度为 8 的字节串(预期为 16)

我尝试更改 Pandas 数据框中“tt”字段的数据类型,但没有结果:

df_data_f['tt'] = df_data_f['tt'].astype('float64')
Run Code Online (Sandbox Code Playgroud)

df_data_f['tt'] = df_data_f['tt'].astype('int64')
Run Code Online (Sandbox Code Playgroud)

使用架构:

 job_config.schema = [
                    ...             
                    bigquery.SchemaField('tt', 'NUMERIC')
                    ...]
Run Code Online (Sandbox Code Playgroud)

阅读此google-cloud-python 问题报告我得到:

数字 = pyarrow.decimal128(38, 9)

因此,“数字” Google BigQuery 数据类型使用比“float64”或“int64”更多的字节,这就是 pyarrow 无法匹配数据类型的原因。


我有:

Python 3.6.4

熊猫1.0.3

pyarrow 0.17.0

谷歌云bigquery 1.24.0

python pandas google-bigquery pyarrow

4
推荐指数
1
解决办法
1万
查看次数

如何读取箭头镶木地板键值元数据?

当我在 R 和 Python 中保存 parquet 文件(使用 pyarrow)时,我得到一个保存在元数据中的箭头模式字符串。

\n\n

如何读取元数据?是Flatbuffer编码的数据吗?模式的定义在哪里?它没有列在箭头文档网站上。

\n\n

元数据是一个键值对,如下所示

\n\n
key: "ARROW:schema"\n\nvalue: "/////5AAAAAQAAAAAAAKAAwABgAFAAgACgAAAAABAwAEAAAAyP///wQAAAABAAAAFAAAABAAGAAIAAYABwAMABAAFAAQAAAAAAABBUAAAAA4AAAAEAAAACgAAAAIAAgAAAAEAAgAAAAMAAAACAAMAAgABwA\xe2\x80\xa6\n
Run Code Online (Sandbox Code Playgroud)\n\n

用 R 写的结果

\n\n
key: "ARROW:schema"\n\nvalue: "/////5AAAAAQAAAAAAAKAAwABgAFAAgACgAAAAABAwAEAAAAyP///wQAAAABAAAAFAAAABAAGAAIAAYABwAMABAAFAAQAAAAAAABBUAAAAA4AAAAEAAAACgAAAAIAAgAAAAEAAgAAAAMAAAACAAMAAgABwA\xe2\x80\xa6\n
Run Code Online (Sandbox Code Playgroud)\n

parquet apache-arrow pyarrow

4
推荐指数
1
解决办法
1475
查看次数

AWS Athena: HIVE_BAD_DATA ERROR: Field type DOUBLE in parquet is incompatible with type defined in table schema

I use AWS Athena to query some data stored in S3, namely partitioned parquet files with pyarrow compression.

I have three columns with string values, one column called "key" with int values and one column called "result" which have both double and int values.

With those columns, I created Schema like:

create external table (
    key int,
    result double,
    location string,
    vehicle_name string.
    filename string
)
Run Code Online (Sandbox Code Playgroud)

When I queried the table, I would get

HIVE_BAD_DATA: Field results type INT64 in …

hive parquet amazon-athena pyarrow

4
推荐指数
1
解决办法
7468
查看次数

PyArrow 表:过滤行

我有一个RecordBatch来自 Plasma DataStore 的文件,我可以将其读入 apyarrow.RecordBatch或 a中pyarrow.Table。我现在尝试在将其转换为 pandas ( to_pandas) 之前过滤掉行。

有没有办法filter在 上使用新的 Dataset API(可以在 ParquetDataset 上使用)中的方法pyarrow.Table?这将使我能够使用这样的过滤器:

[[('date', '=', '2020-01-01')]]

查看源代码,pyarrow.Tablepyarrow.RecordBatch似乎都有一个过滤功能,但至少RecordBatch需要一个布尔掩码。

这可能吗?原因是数据集包含大量非零拷贝的字符串(和/或类别),因此运行to_pandas实际上会引入显着的延迟,而我每次只查找大约 20% 的数据集。

问候,
尼克拉斯

python pandas pyarrow

4
推荐指数
1
解决办法
1万
查看次数

如何使用 Pyarrow 更改 parquet 文件中的列名称?

我有数百个使用 PyArrow 创建的镶木地板文件。然而,其中一些文件的字段/列的名称(我们将其称为 Orange)与原始列(称为 Sporange)略有不同,因为其中使用了查询的变体。否则,数据(所有其他字段和所有数据)是相同的。在数据库世界中,我会执行 ALTER TABLE 并重命名列。但是,我不知道如何使用镶木地板/PyArrow 做到这一点

有没有办法重命名文件中的列,而不必重新生成或复制文件?

或者,我可以读取它(我假设是 read_table 或 ParquetFile),更改对象中的列(不确定如何执行此操作)并将其写出来吗?

我看到“rename_columns”,但不确定它是如何工作的;我尝试单独使用它,它说“rename_columns 未定义”。

rename_columns(self,names)创建新表,其中列重命名为提供的名称。

非常感谢!

parquet pyarrow

4
推荐指数
1
解决办法
8205
查看次数

如何控制 pyarrow.dataset.write_dataset 是否会覆盖以前的数据或追加到它?

我正在尝试使用pyarrow.dataset.write_dataset函数将数据写入 hdfs。但是,如果我写入已存在且包含一些数据的目录,则数据将被覆盖,而不是创建新文件。有没有一种方法可以方便地“附加”到现有的数据集,而不必先读入所有数据?我不需要将数据放在一个文件中,我只是不想删除旧的文件。

我目前所做的和不起作用的:

import pyarrow.dataset as ds
parquet_format = ds.ParquetFileFormat()
write_options = parquet_format.make_write_options(
use_deprecated_int96_timestamps = True,
coerce_timestamps = None, 
allow_truncated_timestamps = True)
ds.write_dataset(data = data, base_dir = 'my_path', filesystem = hdfs_filesystem, format = parquet_format, file_options = write_options)
Run Code Online (Sandbox Code Playgroud)

python pyarrow

4
推荐指数
1
解决办法
4347
查看次数