相关疑难解决方法(0)

在Spark DataFrame中查找每个组的最大行数

我正在尝试使用Spark数据帧而不是RDD,因为它们看起来比RDD更高级,并且往往会产生更易读的代码.

在一个14节点的Google Dataproc集群中,我有大约6百万个名称被两个不同的系统转换为ID:sasb.每个Row包含name,id_said_sb.我的目标是从生产映射id_said_sb使得对于每id_sa时,相应的id_sb是连接到所有名称中最常见的ID id_sa.

让我们试着用一个例子来澄清.如果我有以下行:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]
Run Code Online (Sandbox Code Playgroud)

我的目标是从生产映射a1b2.事实上,相关的名称a1n1,n2n3,分别映射b1,b2b2,因此b2是相关联的名称最常见的映射a1.以同样的方式,a2将映射到b2.可以假设总有一个胜利者:不需要打破关系.

我希望我可以使用groupBy(df.id_sa)我的数据帧,但我不知道接下来该做什么.我希望最终会产生以下行的聚合:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]
Run Code Online (Sandbox Code Playgroud)

但也许我正在尝试使用错误的工具,我应该回到使用RDD.

apache-spark apache-spark-sql pyspark

42
推荐指数
2
解决办法
5万
查看次数

DataFrame/Dataset groupBy行为/优化

假设我们有DataFrame,df包含以下列:

名称,姓氏,大小,宽度,长度,重量

现在我们想要执行几个操作,例如我们想要创建一些包含Size和Width数据的DataFrame.

val df1 = df.groupBy("surname").agg( sum("size") )
val df2 = df.groupBy("surname").agg( sum("width") )
Run Code Online (Sandbox Code Playgroud)

正如您所注意到的,其他列(如Length)不会在任何地方使用.Spark是否足够聪明,可以在洗牌阶段之前丢弃多余的列,还是随身携带?威尔跑:

val dfBasic = df.select("surname", "size", "width")
Run Code Online (Sandbox Code Playgroud)

在分组之前以某种方式影响性能?

performance dataframe apache-spark apache-spark-sql apache-spark-dataset

28
推荐指数
1
解决办法
1万
查看次数

如何最大化值并保留所有列(每组最大记录数)?

给定以下DataFrame:

+----+-----+---+-----+
| uid|    k|  v|count|
+----+-----+---+-----+
|   a|pref1|  b|  168|
|   a|pref3|  h|  168|
|   a|pref3|  t|   63|
|   a|pref3|  k|   84|
|   a|pref1|  e|   84|
|   a|pref2|  z|  105|
+----+-----+---+-----+
Run Code Online (Sandbox Code Playgroud)

我怎样才能获得最大值uid,k但包括v

+----+-----+---+----------+
| uid|    k|  v|max(count)|
+----+-----+---+----------+
|   a|pref1|  b|       168|
|   a|pref3|  h|       168|
|   a|pref2|  z|       105|
+----+-----+---+----------+
Run Code Online (Sandbox Code Playgroud)

我可以做这样的事情,但它会删除列"v":

df.groupBy("uid", "k").max("count")
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

13
推荐指数
3
解决办法
1万
查看次数

如何在Spark 1.6中使用窗口聚合中的collect_set和collect_list函数?

在Spark 1.6.0/Scala中,是否有机会获得collect_list("colC")collect_set("colC").over(Window.partitionBy("colA").orderBy("colB")

scala apache-spark apache-spark-sql apache-spark-1.6

12
推荐指数
1
解决办法
3万
查看次数

如何在SparkSQL中使用Dataframe获取行的迭代器

我在SparkSQL中有一个应用程序返回大量非常难以适应内存的行,因此我无法在DataFrame上使用collect函数,是否有一种方法可以将所有这些行作为Iterable instaed of整个行作为列表.

注意:我正在使用yarn-client执行此SparkSQL应用程序

apache-spark apache-spark-sql apache-spark-1.3

7
推荐指数
1
解决办法
5709
查看次数

如何在组中找到第一个非空值?(使用数据集api进行二级排序)

我正在研究一个代表事件流的数据集(比如从网站上发布的跟踪事件).所有活动都有时间戳.我们经常遇到的一个用例是尝试找到给定字段的第一个非空值.因此,举例来说,最让我们感受到的是:

val eventsDf = spark.read.json(jsonEventsPath) 

case class ProjectedFields(visitId: String, userId: Int, timestamp: Long ... )

val projectedEventsDs = eventsDf.select(
    eventsDf("message.visit.id").alias("visitId"),
    eventsDf("message.property.user_id").alias("userId"),
    eventsDf("message.property.timestamp"),

    ...

).as[ProjectedFields]

projectedEventsDs.groupBy($"visitId").agg(first($"userId", true))
Run Code Online (Sandbox Code Playgroud)

上述代码的问题first在于无法保证馈送到该聚合函数的数据的顺序.我希望它被排序timestamp以确保它是时间戳的第一个非null userId而不是任何随机的非null userId.

有没有办法在分组中定义排序?

使用Spark 2.10


BTW,在SPARK DataFrame中为Spark 2.10建议的方式:选择每个组的第一行是在分组之前进行排序 - 这不起作用.例如,以下代码:

case class OrderedKeyValue(key: String, value: String, ordering: Int)
val ds = Seq(
  OrderedKeyValue("a", null, 1), 
  OrderedKeyValue("a", null, 2), 
  OrderedKeyValue("a", "x", 3), 
  OrderedKeyValue("a", "y", 4), 
  OrderedKeyValue("a", null, 5)
).toDS()

ds.orderBy("ordering").groupBy("key").agg(first("value", true)).collect()
Run Code Online (Sandbox Code Playgroud)

有时会返回Array([a,y]),有时Array([a,x])

apache-spark apache-spark-sql apache-spark-dataset

7
推荐指数
1
解决办法
2452
查看次数

线程 # 将 _ GB 的排序数据溢出到磁盘

我正在尝试编写一个 ETL 过程,在联合之前合并两个数据集,我向每个数据集添加一列,较新的数据集获取 2,较旧的数据集获取 1,然后如果行具有重复的主键,我会删除具有1 在旧/新列中。我尝试以多种方式编写此内容,最近通过执行以下操作:

orderBy(keys, desc(old/new)).dropDuplicates(keys)
Run Code Online (Sandbox Code Playgroud)

但在大型数据集上,我总是会出现大幅减速,并显示一条消息:

16/09/21 20:31:45 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (0  time so far)
16/09/21 20:32:00 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (1  time so far)
16/09/21 20:32:16 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (2  times so far)
16/09/21 20:32:31 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (3  times so …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

5
推荐指数
1
解决办法
8905
查看次数

应该避免使用 DataFrame 函数 groupBy 吗?

链接其他链接groupByKey告诉我,如果有大量密钥,则不应使用Spark ,因为 Spark 会打乱所有密钥。这同样适用于groupBy函数吗?或者这是不同的东西?

我问这个问题是因为我想做这个问题试图做的事情,但我有大量的钥匙。应该可以在不通过本地减少每个节点来打乱所有数据的情况下完成此操作,但我找不到 PySpark 的方法来执行此操作(坦率地说,我发现文档非常缺乏)。

本质上,我想做的是:

# Non-working pseudocode
df.groupBy("A").reduce(lambda x,y: if (x.TotalValue > y.TotalValue) x else y)
Run Code Online (Sandbox Code Playgroud)

然而,dataframe API 不提供“reduce”选项。我可能误解了 dataframe 到底想要实现什么。

python apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
2259
查看次数

按列“ grp”分组并压缩DataFrame-(按列“ ord”排序的每个列的最后一个非空值)

假设我有以下DataFrame:

+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null|  3|null|  11|
|  2|    null|  2| xxx|  22|
|  1|    null|  1| yyy|null|
|  2|    null|  7|null|  33|
|  1|    null| 12|null|null|
|  2|    null| 19|null|  77|
|  1|    null| 10| s13|null|
|  2|    null| 11| a23|null|
+---+--------+---+----+----+
Run Code Online (Sandbox Code Playgroud)

这是带有注释的相同样本DF,按grp和排序ord

scala> df.orderBy("grp", "ord").show
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null|  1| yyy|null|
|  1|    null|  3|null|  11|   # grp:1 - last value for `col2` (11)
|  1|    null| 10| s13|null|   # …
Run Code Online (Sandbox Code Playgroud)

scala aggregate-functions aggregation apache-spark

5
推荐指数
1
解决办法
274
查看次数

Spark-sortWithInPartitions超过排序

以下是代表员工in_date和out_date的样本数据集。我必须获取所有员工的最后in_time。

Spark在4节点独立群集上运行。

初始数据集:

员工ID -----入职日期-----离职日期

1111111     2017-04-20  2017-09-14 
1111111     2017-11-02  null 
2222222     2017-09-26  2017-09-26 
2222222     2017-11-28  null 
3333333     2016-01-07  2016-01-20 
3333333     2017-10-25  null 
Run Code Online (Sandbox Code Playgroud)

之后的数据集df.sort(col(in_date).desc())

员工编号-in_date ----- out_date

1111111   2017-11-02   null 
1111111   2017-04-20   2017-09-14 
2222222   2017-09-26   2017-09-26 
2222222   2017-11-28   null 
3333333   2017-10-25   null 
3333333   2016-01-07   2016-01-20 
Run Code Online (Sandbox Code Playgroud)
df.dropDup(EmployeeID):  
Run Code Online (Sandbox Code Playgroud)

输出

员工ID -----入职日期-----离职日期

1111111    2017-11-02    null 
2222222    2017-09-26    2017-09-26 
3333333    2016-01-07    2016-01-20 
Run Code Online (Sandbox Code Playgroud)

预期数据集:

员工ID -----入职日期-----离职日期

1111111    2017-11-02   null 
2222222    2017-11-28   null 
3333333    2017-10-25   null 
Run Code Online (Sandbox Code Playgroud)

但是,当我使用进行初始数据集排序sortWithInPartitions并进行重复数据删除时,我得到了预期的数据集。我在这里错过了大大小小的东西吗?任何帮助表示赞赏。

附加信息: 当在本地模式下用Spark执行df.sort时,实现了上述预期输出。
我没有做任何分区,重新分区。初始数据集是从基础Cassandra数据库获得的。

apache-spark apache-spark-sql spark-cassandra-connector apache-spark-dataset

4
推荐指数
1
解决办法
3399
查看次数