我正在处理(使用 pyspark)可能包含大字符串值的数据帧。简单来说,该列包含来自 API 的响应,该响应是大小最大为 20 MB 的字符串。通常该字符串小于 10 MB。我注意到,当字符串大小达到一定大小(大约 16 MB)时,读取这些 parquet 文件非常慢并且消耗大量内存(我的 Spark 集群会抛出 OOM)。对于 Spark 和 pyarrow 都是如此,但对于 pandas 则不然(我猜是因为pyarrow 的 cpp 实现)。
有什么办法可以让它更快。有人可以证实我的假设,即这与 java 中处理大字符串的内存分配有关吗?
以下是 parquet-inspect 的输出:
############ file meta data ############
created_by: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
num_columns: 1
num_rows: 100
num_row_groups: 1
format_version: 1.0
serialized_size: 400
############ Columns ############
api_response
############ Column(ocr_response) ############
name: ocr_response
path: ocr_response
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY (space_saved: 82%)
Run Code Online (Sandbox Code Playgroud) 我正在尝试将 Pandas 数据帧写入分区文件:
df.to_parquet('output.parquet', engine='pyarrow', partition_cols = ['partone', 'partwo'])
TypeError: __cinit__() got an unexpected keyword argument 'partition_cols'
Run Code Online (Sandbox Code Playgroud)
从文档中我预计partition_cols将作为 kwargs 传递给 pyarrow 库。如何使用 Pandas 将分区文件写入本地磁盘?
我想将共享 DataFrame 的只读访问权限授予由multiprocessing.Pool.map().
我想避免复制和酸洗。
我知道可以使用 pyarrow 。但是,我发现他们的文档非常繁琐。任何人都可以提供一个关于如何完成的例子吗?
我试图用增量编码编写镶木地板文件。 此页面指出 parquet 支持三种类型的 delta 编码:
(DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY).
Run Code Online (Sandbox Code Playgroud)
由于spark或不允许我们指定编码方法,我很好奇如何编写启用增量编码的文件pyspark?pyarrow
但是,我在互联网上发现,如果我有TimeStamp镶木地板类型的列,将使用增量编码。所以我使用以下代码来scala创建镶木地板文件。但编码不是增量。
val df = Seq(("2018-05-01"),
("2018-05-02"),
("2018-05-03"),
("2018-05-04"),
("2018-05-05"),
("2018-05-06"),
("2018-05-07"),
("2018-05-08"),
("2018-05-09"),
("2018-05-10")
).toDF("Id")
val df2 = df.withColumn("Timestamp", (col("Id").cast("timestamp")))
val df3 = df2.withColumn("Date", (col("Id").cast("date")))
df3.coalesce(1).write.format("parquet").mode("append").save("date_time2")
Run Code Online (Sandbox Code Playgroud)
parquet-tools显示有关写入的 parquet 文件的以下信息。
file schema: spark_schema
--------------------------------------------------------------------------------
Id: OPTIONAL BINARY L:STRING R:0 D:1
Timestamp: OPTIONAL INT96 R:0 D:1
Date: OPTIONAL INT32 L:DATE R:0 D:1
row group 1: RC:31 TS:1100 OFFSET:4
--------------------------------------------------------------------------------
Id: BINARY …Run Code Online (Sandbox Code Playgroud) 当我将数字数据(int64 或 float64)从 Pandas 数据框上传到“数字” Google BigQuery 数据类型时,出现以下错误:
pyarrow.lib.ArrowInvalid:获得长度为 8 的字节串(预期为 16)
我尝试更改 Pandas 数据框中“tt”字段的数据类型,但没有结果:
df_data_f['tt'] = df_data_f['tt'].astype('float64')
Run Code Online (Sandbox Code Playgroud)
和
df_data_f['tt'] = df_data_f['tt'].astype('int64')
Run Code Online (Sandbox Code Playgroud)
使用架构:
job_config.schema = [
...
bigquery.SchemaField('tt', 'NUMERIC')
...]
Run Code Online (Sandbox Code Playgroud)
阅读此google-cloud-python 问题报告我得到:
数字 = pyarrow.decimal128(38, 9)
因此,“数字” Google BigQuery 数据类型使用比“float64”或“int64”更多的字节,这就是 pyarrow 无法匹配数据类型的原因。
我有:
Python 3.6.4
熊猫1.0.3
pyarrow 0.17.0
谷歌云bigquery 1.24.0
当我在 R 和 Python 中保存 parquet 文件(使用 pyarrow)时,我得到一个保存在元数据中的箭头模式字符串。
\n\n如何读取元数据?是Flatbuffer编码的数据吗?模式的定义在哪里?它没有列在箭头文档网站上。
\n\n元数据是一个键值对,如下所示
\n\nkey: "ARROW:schema"\n\nvalue: "/////5AAAAAQAAAAAAAKAAwABgAFAAgACgAAAAABAwAEAAAAyP///wQAAAABAAAAFAAAABAAGAAIAAYABwAMABAAFAAQAAAAAAABBUAAAAA4AAAAEAAAACgAAAAIAAgAAAAEAAgAAAAMAAAACAAMAAgABwA\xe2\x80\xa6\nRun Code Online (Sandbox Code Playgroud)\n\n用 R 写的结果
\n\nkey: "ARROW:schema"\n\nvalue: "/////5AAAAAQAAAAAAAKAAwABgAFAAgACgAAAAABAwAEAAAAyP///wQAAAABAAAAFAAAABAAGAAIAAYABwAMABAAFAAQAAAAAAABBUAAAAA4AAAAEAAAACgAAAAIAAgAAAAEAAgAAAAMAAAACAAMAAgABwA\xe2\x80\xa6\nRun Code Online (Sandbox Code Playgroud)\n I use AWS Athena to query some data stored in S3, namely partitioned parquet files with pyarrow compression.
I have three columns with string values, one column called "key" with int values and one column called "result" which have both double and int values.
With those columns, I created Schema like:
create external table (
key int,
result double,
location string,
vehicle_name string.
filename string
)
Run Code Online (Sandbox Code Playgroud)
When I queried the table, I would get
HIVE_BAD_DATA: Field results type INT64 in …
我有一个RecordBatch来自 Plasma DataStore 的文件,我可以将其读入 apyarrow.RecordBatch或 a中pyarrow.Table。我现在尝试在将其转换为 pandas ( to_pandas) 之前过滤掉行。
有没有办法filter在 上使用新的 Dataset API(可以在 ParquetDataset 上使用)中的方法pyarrow.Table?这将使我能够使用这样的过滤器:
[[('date', '=', '2020-01-01')]]
查看源代码,pyarrow.Table和pyarrow.RecordBatch似乎都有一个过滤功能,但至少RecordBatch需要一个布尔掩码。
这可能吗?原因是数据集包含大量非零拷贝的字符串(和/或类别),因此运行to_pandas实际上会引入显着的延迟,而我每次只查找大约 20% 的数据集。
问候,
尼克拉斯
我有数百个使用 PyArrow 创建的镶木地板文件。然而,其中一些文件的字段/列的名称(我们将其称为 Orange)与原始列(称为 Sporange)略有不同,因为其中使用了查询的变体。否则,数据(所有其他字段和所有数据)是相同的。在数据库世界中,我会执行 ALTER TABLE 并重命名列。但是,我不知道如何使用镶木地板/PyArrow 做到这一点
有没有办法重命名文件中的列,而不必重新生成或复制文件?
或者,我可以读取它(我假设是 read_table 或 ParquetFile),更改对象中的列(不确定如何执行此操作)并将其写出来吗?
我看到“rename_columns”,但不确定它是如何工作的;我尝试单独使用它,它说“rename_columns 未定义”。
rename_columns(self,names)创建新表,其中列重命名为提供的名称。
非常感谢!
我正在尝试使用pyarrow.dataset.write_dataset函数将数据写入 hdfs。但是,如果我写入已存在且包含一些数据的目录,则数据将被覆盖,而不是创建新文件。有没有一种方法可以方便地“附加”到现有的数据集,而不必先读入所有数据?我不需要将数据放在一个文件中,我只是不想删除旧的文件。
我目前所做的和不起作用的:
import pyarrow.dataset as ds
parquet_format = ds.ParquetFileFormat()
write_options = parquet_format.make_write_options(
use_deprecated_int96_timestamps = True,
coerce_timestamps = None,
allow_truncated_timestamps = True)
ds.write_dataset(data = data, base_dir = 'my_path', filesystem = hdfs_filesystem, format = parquet_format, file_options = write_options)
Run Code Online (Sandbox Code Playgroud) pyarrow ×10
parquet ×6
python ×5
pandas ×4
apache-spark ×2
pyspark ×2
apache-arrow ×1
hive ×1
java ×1
scala ×1