小编Ива*_*дос的帖子

将Pandas数据帧转换为Spark数据帧错误

我正在尝试将Pandas DF转换为Spark.DF头:

10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691
Run Code Online (Sandbox Code Playgroud)

码:

dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)
Run Code Online (Sandbox Code Playgroud)

我收到一个错误:

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
Run Code Online (Sandbox Code Playgroud)

python pandas apache-spark spark-dataframe

32
推荐指数
5
解决办法
10万
查看次数

Spark DataFrame mapPartitions

我需要在Spark DataFrame上进行分布式计算,在DataFrame的块上调用一些任意(不是SQL)逻辑.我做了:

def some_func(df_chunk):
    pan_df = df_chunk.toPandas()
    #whatever logic here

df = sqlContext.read.parquet(...)
result = df.mapPartitions(some_func)
Run Code Online (Sandbox Code Playgroud)

不幸的是,它导致:

AttributeError:'itertools.chain'对象没有属性'toPandas'

我希望在每个地图调用中都有spark DataFrame对象,而不是'itertools.chain'.为什么?以及如何克服这一点?

python apache-spark apache-spark-sql pyspark

10
推荐指数
1
解决办法
5874
查看次数

Joblib Parallel 不会终止进程

我按以下方式并行运行代码:

grouped_data = Parallel(n_jobs=14)(delayed(function)(group) for group in grouped_data)
Run Code Online (Sandbox Code Playgroud)

计算完成后,我可以在系统监视器中看到所有生成的进程仍然处于活动状态并且消耗内存:

在此输入图像描述

并且所有这些进程都不会被杀死,直到主进程终止,这会导致内存泄漏。如果我按以下方式对 multiprocessing.Pool 执行相同操作:

pool = Pool(14)
pool.map(apply_wrapper, np.array_split(groups, 14))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

然后我看到所有生成的处理最终都终止并且没有内存泄漏。但是,我需要 joblib 并且它是不稳定的后端,因为它允许序列化一些本地函数。

如何强制终止 joblib.Parallel 生成的进程并释放内存?我的环境如下:Python 3.8,Ubuntu Linux。

python memory-leaks multiprocessing joblib

10
推荐指数
1
解决办法
6848
查看次数

在 Pandas 中旋转每个组

使用 Pandas 我在我的数据帧上调用了 groupby 并获得了以下内容:

>>>grouped = df.groupby(['cid'])
for key, gr in grouped:
        print(key)
        print(gr)
Out: cid  price
     121  12
     121  10
     121  9
Run Code Online (Sandbox Code Playgroud)

我想让每个组像这样旋转:

cid price1 price2 price3
121     12     10      9
Run Code Online (Sandbox Code Playgroud)

对 Pandas 执行此操作的正确方法是什么?

python pivot pandas

3
推荐指数
1
解决办法
1716
查看次数