我正在尝试使用Spark数据帧而不是RDD,因为它们看起来比RDD更高级,并且往往会产生更易读的代码.
在一个14节点的Google Dataproc集群中,我有大约6百万个名称被两个不同的系统转换为ID:sa和sb.每个Row包含name,id_sa和id_sb.我的目标是从生产映射id_sa到id_sb使得对于每id_sa时,相应的id_sb是连接到所有名称中最常见的ID id_sa.
让我们试着用一个例子来澄清.如果我有以下行:
[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]
Run Code Online (Sandbox Code Playgroud)
我的目标是从生产映射a1到b2.事实上,相关的名称a1是n1,n2和n3,分别映射b1,b2和b2,因此b2是相关联的名称最常见的映射a1.以同样的方式,a2将映射到b2.可以假设总有一个胜利者:不需要打破关系.
我希望我可以使用groupBy(df.id_sa)我的数据帧,但我不知道接下来该做什么.我希望最终会产生以下行的聚合:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
Run Code Online (Sandbox Code Playgroud)
但也许我正在尝试使用错误的工具,我应该回到使用RDD.
假设我们有DataFrame,df包含以下列:
名称,姓氏,大小,宽度,长度,重量
现在我们想要执行几个操作,例如我们想要创建一些包含Size和Width数据的DataFrame.
val df1 = df.groupBy("surname").agg( sum("size") )
val df2 = df.groupBy("surname").agg( sum("width") )
Run Code Online (Sandbox Code Playgroud)
正如您所注意到的,其他列(如Length)不会在任何地方使用.Spark是否足够聪明,可以在洗牌阶段之前丢弃多余的列,还是随身携带?威尔跑:
val dfBasic = df.select("surname", "size", "width")
Run Code Online (Sandbox Code Playgroud)
在分组之前以某种方式影响性能?
performance dataframe apache-spark apache-spark-sql apache-spark-dataset
给定以下DataFrame:
+----+-----+---+-----+
| uid| k| v|count|
+----+-----+---+-----+
| a|pref1| b| 168|
| a|pref3| h| 168|
| a|pref3| t| 63|
| a|pref3| k| 84|
| a|pref1| e| 84|
| a|pref2| z| 105|
+----+-----+---+-----+
Run Code Online (Sandbox Code Playgroud)
我怎样才能获得最大值uid,k但包括v?
+----+-----+---+----------+
| uid| k| v|max(count)|
+----+-----+---+----------+
| a|pref1| b| 168|
| a|pref3| h| 168|
| a|pref2| z| 105|
+----+-----+---+----------+
Run Code Online (Sandbox Code Playgroud)
我可以做这样的事情,但它会删除列"v":
df.groupBy("uid", "k").max("count")
Run Code Online (Sandbox Code Playgroud) 在Spark 1.6.0/Scala中,是否有机会获得collect_list("colC")或collect_set("colC").over(Window.partitionBy("colA").orderBy("colB")?
我在SparkSQL中有一个应用程序返回大量非常难以适应内存的行,因此我无法在DataFrame上使用collect函数,是否有一种方法可以将所有这些行作为Iterable instaed of整个行作为列表.
注意:我正在使用yarn-client执行此SparkSQL应用程序
我正在研究一个代表事件流的数据集(比如从网站上发布的跟踪事件).所有活动都有时间戳.我们经常遇到的一个用例是尝试找到给定字段的第一个非空值.因此,举例来说,最让我们感受到的是:
val eventsDf = spark.read.json(jsonEventsPath)
case class ProjectedFields(visitId: String, userId: Int, timestamp: Long ... )
val projectedEventsDs = eventsDf.select(
eventsDf("message.visit.id").alias("visitId"),
eventsDf("message.property.user_id").alias("userId"),
eventsDf("message.property.timestamp"),
...
).as[ProjectedFields]
projectedEventsDs.groupBy($"visitId").agg(first($"userId", true))
Run Code Online (Sandbox Code Playgroud)
上述代码的问题first在于无法保证馈送到该聚合函数的数据的顺序.我希望它被排序timestamp以确保它是时间戳的第一个非null userId而不是任何随机的非null userId.
有没有办法在分组中定义排序?
使用Spark 2.10
BTW,在SPARK DataFrame中为Spark 2.10建议的方式:选择每个组的第一行是在分组之前进行排序 - 这不起作用.例如,以下代码:
case class OrderedKeyValue(key: String, value: String, ordering: Int)
val ds = Seq(
OrderedKeyValue("a", null, 1),
OrderedKeyValue("a", null, 2),
OrderedKeyValue("a", "x", 3),
OrderedKeyValue("a", "y", 4),
OrderedKeyValue("a", null, 5)
).toDS()
ds.orderBy("ordering").groupBy("key").agg(first("value", true)).collect()
Run Code Online (Sandbox Code Playgroud)
有时会返回Array([a,y]),有时Array([a,x])
我正在尝试编写一个 ETL 过程,在联合之前合并两个数据集,我向每个数据集添加一列,较新的数据集获取 2,较旧的数据集获取 1,然后如果行具有重复的主键,我会删除具有1 在旧/新列中。我尝试以多种方式编写此内容,最近通过执行以下操作:
orderBy(keys, desc(old/new)).dropDuplicates(keys)
Run Code Online (Sandbox Code Playgroud)
但在大型数据集上,我总是会出现大幅减速,并显示一条消息:
16/09/21 20:31:45 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (0 time so far)
16/09/21 20:32:00 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (1 time so far)
16/09/21 20:32:16 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (2 times so far)
16/09/21 20:32:31 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (3 times so …Run Code Online (Sandbox Code Playgroud) 此链接和其他链接groupByKey告诉我,如果有大量密钥,则不应使用Spark ,因为 Spark 会打乱所有密钥。这同样适用于groupBy函数吗?或者这是不同的东西?
我问这个问题是因为我想做这个问题试图做的事情,但我有大量的钥匙。应该可以在不通过本地减少每个节点来打乱所有数据的情况下完成此操作,但我找不到 PySpark 的方法来执行此操作(坦率地说,我发现文档非常缺乏)。
本质上,我想做的是:
# Non-working pseudocode
df.groupBy("A").reduce(lambda x,y: if (x.TotalValue > y.TotalValue) x else y)
Run Code Online (Sandbox Code Playgroud)
然而,dataframe API 不提供“reduce”选项。我可能误解了 dataframe 到底想要实现什么。
假设我有以下DataFrame:
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 3|null| 11|
| 2| null| 2| xxx| 22|
| 1| null| 1| yyy|null|
| 2| null| 7|null| 33|
| 1| null| 12|null|null|
| 2| null| 19|null| 77|
| 1| null| 10| s13|null|
| 2| null| 11| a23|null|
+---+--------+---+----+----+
Run Code Online (Sandbox Code Playgroud)
这是带有注释的相同样本DF,按grp和排序ord:
scala> df.orderBy("grp", "ord").show
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 1| yyy|null|
| 1| null| 3|null| 11| # grp:1 - last value for `col2` (11)
| 1| null| 10| s13|null| # …Run Code Online (Sandbox Code Playgroud) 以下是代表员工in_date和out_date的样本数据集。我必须获取所有员工的最后in_time。
Spark在4节点独立群集上运行。
初始数据集:
员工ID -----入职日期-----离职日期
1111111 2017-04-20 2017-09-14
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2016-01-07 2016-01-20
3333333 2017-10-25 null
Run Code Online (Sandbox Code Playgroud)
之后的数据集df.sort(col(in_date).desc()):
员工编号-in_date ----- out_date
1111111 2017-11-02 null
1111111 2017-04-20 2017-09-14
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2017-10-25 null
3333333 2016-01-07 2016-01-20
Run Code Online (Sandbox Code Playgroud)
df.dropDup(EmployeeID):
Run Code Online (Sandbox Code Playgroud)
输出:
员工ID -----入职日期-----离职日期
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
3333333 2016-01-07 2016-01-20
Run Code Online (Sandbox Code Playgroud)
预期数据集:
员工ID -----入职日期-----离职日期
1111111 2017-11-02 null
2222222 2017-11-28 null
3333333 2017-10-25 null
Run Code Online (Sandbox Code Playgroud)
但是,当我使用进行初始数据集排序sortWithInPartitions并进行重复数据删除时,我得到了预期的数据集。我在这里错过了大大小小的东西吗?任何帮助表示赞赏。
附加信息:
当在本地模式下用Spark执行df.sort时,实现了上述预期输出。
我没有做任何分区,重新分区。初始数据集是从基础Cassandra数据库获得的。
apache-spark apache-spark-sql spark-cassandra-connector apache-spark-dataset