Mul*_*ard 7 python pandas apache-spark apache-arrow pyarrow
我正在处理熊猫和Spark数据帧。数据帧始终很大(> 20 GB),而标准的火花功能不足以容纳这些大小。目前,我将我的熊猫数据框转换为火花数据框,如下所示:
dataframe = spark.createDataFrame(pandas_dataframe)
Run Code Online (Sandbox Code Playgroud)
我进行这种转换是因为通过火花将数据帧写入hdfs非常容易:
dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
Run Code Online (Sandbox Code Playgroud)
但是,对于大于2 GB的数据帧,转换失败。如果将spark数据框转换为熊猫,则可以使用pyarrow:
// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")
// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)
// delete temp files
hdfs.delete(path, recursive=True)
Run Code Online (Sandbox Code Playgroud)
这是从Spark到Pandas的快速会话,它也适用于大于2 GB的数据帧。我还找不到其他方法可以做到这一点。意思是有一个熊猫数据框,我在pyarrow的帮助下将其转换为火花。问题是我真的找不到如何将熊猫数据帧写入hdfs。
我的熊猫版本:0.19.0
意思是有一个熊猫数据框,我在pyarrow的帮助下将其转换为火花。
pyarrow.Table.fromPandas 是您要寻找的功能:
Run Code Online (Sandbox Code Playgroud)Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True) Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa
pdf = ... # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf) # type: pyarrow.lib.Table
Run Code Online (Sandbox Code Playgroud)
结果可以直接写入Parquet / HDFS,而无需通过Spark传递数据:
import pyarrow.parquet as pq
fs = pa.hdfs.connect()
with fs.open(path, "wb") as fw
pq.write_table(adf, fw)
Run Code Online (Sandbox Code Playgroud)
也可以看看
pyarrow文档中读写Apache Parquet格式。火花笔记:
此外,由于createDataFrame(SPARK-20791-使用Apache Arrow从Pandas.DataFrame改进Spark createDataFrame)直接支持Spark 2.3(当前主版本)Arrow 。它用于SparkContext.defaultParallelism计算块数,因此您可以轻松控制单个批次的大小。
最后,defaultParallelism可用于控制使用标准生成的分区数量,从而_convert_from_pandas有效地将片的大小减小到更易于管理的程度。
不幸的是,这些不太可能解决您当前的内存问题。两者都依赖parallelize,因此将所有数据存储在驱动程序节点的内存中。切换到箭头或调整配置只能加快过程或地址块大小的限制。
实际上,只要您使用本地熊猫DataFrame作为输入,我就看不出有任何理由在这里切换到Spark 。在这种情况下,最严重的瓶颈是驱动程序的网络I / O,而分发数据将无法解决该问题。
| 归档时间: |
|
| 查看次数: |
9058 次 |
| 最近记录: |