Spark数据框随机拆分

Bak*_*war 13 python apache-spark pyspark

我有一个火花数据框架,我想分为火车,验证和测试,比例为0.60,0.20,0.20.

我使用以下代码:

def data_split(x):
    global data_map_var
    d_map = data_map_var.value
    data_row = x.asDict()
    import random
    rand = random.uniform(0.0,1.0)
    ret_list = ()
    if rand <= 0.6:
        ret_list = (data_row['TRANS'] , d_map[data_row['ITEM']] , data_row['Ratings'] , 'train')
    elif rand <=0.8:
        ret_list = (data_row['TRANS'] , d_map[data_row['ITEM']] , data_row['Ratings'] , 'test')
    else:
        ret_list = (data_row['TRANS'] , d_map[data_row['ITEM']] , data_row['Ratings'] , 'validation')
    return ret_list
?
?
split_sdf = ratings_sdf.map(data_split)
train_sdf = split_sdf.filter(lambda x : x[-1] == 'train').map(lambda x :(x[0],x[1],x[2]))
test_sdf = split_sdf.filter(lambda x : x[-1] == 'test').map(lambda x :(x[0],x[1],x[2]))
validation_sdf = split_sdf.filter(lambda x : x[-1] == 'validation').map(lambda x :(x[0],x[1],x[2]))
?
print "Total Records in Original Ratings RDD is {}".format(split_sdf.count())
?
print "Total Records in training data RDD is {}".format(train_sdf.count())
?
print "Total Records in validation data RDD is {}".format(validation_sdf.count())
?
print "Total Records in test data RDD is {}".format(test_sdf.count())
?
?
#help(ratings_sdf)
Total Records in Original Ratings RDD is 300001
Total Records in training data RDD is 180321
Total Records in validation data RDD is 59763
Total Records in test data RDD is 59837
Run Code Online (Sandbox Code Playgroud)

我的原始数据框是ratings_sdf,我用它来传递一个执行拆分的映射器函数.

如果您检查列车的总和,验证和测试不总和拆分(原始评级)计数.这些数字在每次运行代码时都会发生变化.

剩下的记录在哪里以及为什么总和不相等?

use*_*411 24

TL; DR如果要拆分DataFrame使用randomSplit方法:

ratings_sdf.randomSplit([0.6, 0.2, 0.2])
Run Code Online (Sandbox Code Playgroud)

您的代码在多个级别上都是错误的,但有两个基本问题会导致它无法修复:

  • Spark变换可以被任意次数评估,​​你使用的函数应该是refereically透明和副作用.您的代码会split_sdf多次评估并使用有状态RNG,data_split因此每次结果都不同.

    这会导致您描述每个孩子看到父RDD的不同状态的行为.

  • 您没有正确初始化RNG,因此您获得的随机值不是独立的.

  • @vipin除了在每个项目上调用`count`之外?并不是的。但是,如果您有11条记录,那么使用Spark并没有多大意义,如果您要进行特定的分发,则始终可以拆分本地数据并在以后对其进行并行化。对于“实际”大小的数据,通常不成问题-[考虑到`randomSplit`的工作方式](/sf/ask/2305320041/)您不太可能会得到具有相对较大且平衡分数的空拆分像这些.. (2认同)