Kaz*_*yur 2 dataframe apache-spark apache-spark-sql
我有 2 个大数据帧要根据关联键进行合并。使用join需要更长的时间才能完成任务。
我看到cogroup在 Apache Spark中使用优于加入。任何人都可以指出如何cogroup在 DataFrames上使用或建议更好的方法来合并 2 个大型 DataFrames。
谢谢
火花 >= 3.0
由于 3.0 Sparkcogroup使用 Pandas / Arrow提供 PySpark 特定的。一般语法如下:
left.cogroup(right).apply(f)
Run Code Online (Sandbox Code Playgroud)
其中both和right是GroupedData对象并且f是一个COGROUPED_MAP用户定义的函数,它接受两个 PandasDataFrames并返回 PandasDataFrame
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas.core.frame import DataFrame as PandasDataFrame
@pandas_udf(schema)
def f(left: PandasDataFrame, right: PandasDataFrame) -> PandasDataFrame: ...
Run Code Online (Sandbox Code Playgroud)
火花 >= 1.6
JVMKeyValueGroupedDataset提供了 Java
def cogroup[U, R](other: KeyValueGroupedDataset[K, U], f: CoGroupFunction[K, V, U, R], encoder: Encoder[R]): Dataset[R]
Run Code Online (Sandbox Code Playgroud)
和斯卡拉
def cogroup[U, R](other: KeyValueGroupedDataset[K, U])(f: (K, Iterator[V], Iterator[U]) ? TraversableOnce[R])(implicit arg0: Encoder[R]): Dataset[R]
Run Code Online (Sandbox Code Playgroud)
然而,它适用于“强”类型的变体,而不是Dataset[Row],并且极不可能为您声明的目标(性能改进)做出贡献。
Spark < 1.6(这部分继续有效,除了上面列出的小 API 添加)。
DataFrame不提供任何等效的cogroup函数,并且复杂对象不是 Spark SQL 中的一等公民。复杂结构上可用的一组操作相当有限,因此通常您必须创建非平凡的自定义表达式或使用 UDF 并支付性能损失。此外,Spark SQL 不使用与joinplain相同的逻辑RDDs。
关于 RDD。虽然存在cogroup可以有利的边界情况,join但通常情况下不应该是这种情况,除非结果 -> 完整数据集的笛卡尔积。在 RDD 上的所有连接都使用cogroup后跟表示,flatMapValues并且由于后一个操作是本地的,唯一真正的开销是创建输出元组。
如果您的表只包含原始类型,您可以通过将列与collect_listfirst聚合来模仿 co-group 之类的行为,但我不希望这里有任何性能提升。