小编bja*_*ck3的帖子

在(py)spark中取消持久保存所有数据帧

我是一个火花应用程序,有几点我想坚持当前的状态.这通常是在一大步之后,或缓存我想要多次使用的状态.看来,当我第二次在我的数据帧上调用缓存时,新副本会缓存到内存中.在我的应用程序中,这会在扩展时导致内存问题.即使在我当前的测试中,给定的数据帧最大约为100 MB,中间结果的累积大小也会超​​出执行程序的分配内存.请参阅下面的一个显示此行为的小示例.

cache_test.py:

from pyspark import SparkContext, HiveContext

spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)

df = (hive_context.read
      .format('com.databricks.spark.csv')
      .load('simple_data.csv')
     )
df.cache()
df.show()

df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()

spark_context.stop()
Run Code Online (Sandbox Code Playgroud)

simple_data.csv:

1,2,3
4,5,6
7,8,9
Run Code Online (Sandbox Code Playgroud)

查看应用程序UI,有一个原始数据框的副本,与新列的副本相对应.我可以通过df.unpersist()在withColumn行之前调用来删除原始副本.这是删除缓存中间结果的推荐方法(即在每次调用之前调用unpersist cache()).

此外,是否可以清除所有缓存的对象.在我的应用程序中,有一些自然断点,我可以简单地清除所有内存,然后转到下一个文件.我想这样做而不为每个输入文件创建一个新的spark应用程序.

先感谢您!

python caching apache-spark apache-spark-sql pyspark

26
推荐指数
2
解决办法
3万
查看次数

(py)Spark中分组数据的模式

我有一个包含多列的spark DataFrame.我想基于一列对行进行分组,然后为每个组找到第二列的模式.使用pandas DataFrame,我会做这样的事情:

rand_values = np.random.randint(max_value,
                                size=num_values).reshape((num_values/2, 2))
rand_values = pd.DataFrame(rand_values, columns=['x', 'y'])
rand_values['x'] = rand_values['x'] > max_value/2
rand_values['x'] = rand_values['x'].astype('int32')

print(rand_values)
##    x  y
## 0  0  0
## 1  0  4
## 2  0  1
## 3  1  1
## 4  1  2

def mode(series):
    return scipy.stats.mode(series['y'])[0][0]

rand_values.groupby('x').apply(mode)
## x
## 0    4
## 1    1
## dtype: int64
Run Code Online (Sandbox Code Playgroud)

在pyspark中,我能够找到单列的模式

df = sql_context.createDataFrame(rand_values)

def mode_spark(df, column):
    # Group by column and count the number of occurrences
    # of …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe

6
推荐指数
1
解决办法
1万
查看次数