tes*_*acc 7 python pandas apache-spark
我有一个 1000 万条记录数据框。我的要求是我需要对 Pandas 中的这些数据进行一些操作,而且我没有一次将所有 1000 万条记录放入 Pandas 的内存。所以我希望能够将它分块并在每个块上使用 toPandas
df = sqlContext.sql("select * from db.table")
#do chunking to take X records at a time
#how do I generated chunked_df?
p_df = chunked_df.toPandas()
#do things to p_df
Run Code Online (Sandbox Code Playgroud)
我如何将我的数据帧分成相等的 x 部分或按记录计数分成几部分,比如一次 100 万。任何一种解决方案都是可以接受的,我只需要以较小的块处理它。
一种选择是toLocalIterator与repartition和结合使用mapPartitions。
import pandas as pd
columns = spark_df.schema.fieldNames()
chunks = spark_df.repartition(num_chunks).rdd.mapPartitions(lambda iterator: [pd.DataFrame(list(iterator), columns=columns)]).toLocalIterator()
for pdf in chunks:
# do work locally on chunk as pandas df
Run Code Online (Sandbox Code Playgroud)
通过使用toLocalIterator,一次只能将一个分区收集到驱动程序中。
另一种在我看来更可取的选择是将您的工作分布在每个分区中的 Pandas 块上的集群中。这可以使用pandas_udf:
from pyspark.sql.functions import spark_partition_id, pandas_udf, PandasUDFType
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def transform_pandas_df_chunk(pdf):
result_pdf = ...
# do ditributed work on a chunk of the original spark dataframe as a pandas dataframe
return result_pdf
spark_df = spark_df.repartition(num_chunks).groupby(spark_partition_id()).apply(transform_pandas_df_chunk)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4916 次 |
| 最近记录: |