小编Moh*_*han的帖子

Pyspark - 多列上的聚合

我有如下数据.文件名:babynames.csv.

year    name    percent     sex
1880    John    0.081541    boy
1880    William 0.080511    boy
1880    James   0.050057    boy
Run Code Online (Sandbox Code Playgroud)

我需要根据年份和性别对输入进行排序,我希望输出汇总如下(此输出将分配给新的RDD).

year    sex   avg(percentage)   count(rows)
1880    boy   0.070703         3
Run Code Online (Sandbox Code Playgroud)

我不确定如何在pyspark中执行以下步骤.需要你的帮助

testrdd = sc.textFile("babynames.csv");
rows = testrdd.map(lambda y:y.split(',')).filter(lambda x:"year" not in x[0])
aggregatedoutput = ????
Run Code Online (Sandbox Code Playgroud)

python python-2.7 apache-spark pyspark

8
推荐指数
1
解决办法
3万
查看次数

如何在 Pyspark 中高效连接一个非常大的表和一个大表

我有两张桌子。这两个表都是 Hive 中以 parquet 数据格式存储的外部表。

第一个表table_1从 2015 年开始每天有2.5 亿行。该表根据 create_date 进行分区。因此,对于每个 create_date,大约有 250M 行。

第二个表 - table_2是每日增量表,平均行数约为150 万行。

两个表中有一个公共列“lookup_id”。现在,我需要使用数据帧从 table_1 中获取所有列,以获取 table_2 中的增量数据。

我想做如下的事情

table_1=spark.table("table_1")
table_2=spark.table("table_2")
result_df=table_1.join(table_2, table_1.lookup_id=table_2.lookup_id, "inner").drop(table_2.lookup_id)
Run Code Online (Sandbox Code Playgroud)

但我怀疑这是否真的有效,以及 pyspark 是否能够在没有任何内存错误的情况下处理这个问题。

问题1: 如何基于create_date分区并行化table_1扫描?

问题 2: 是否有其他方法可以基于 table_2 中的lookup_ids 和/或基于分区来优化 table_1 扫描?

其他信息可以让我更清楚地了解我正在寻找的内容:

我试图了解当我们使用数据帧连接表时,spark 是否读取数据并将其保存在内存中并连接它们,或者它只是在读取自身时连接。如果第二个为 true,则第二个语句适用于哪些连接。另外,如果需要使用循环来避免任何内存错误。

apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
2万
查看次数