如何在pyspark中将DataFrame转换回普通RDD?

jav*_*dba 48 python apache-spark pyspark

我需要使用

(rdd.)partitionBy(npartitions, custom_partitioner)
Run Code Online (Sandbox Code Playgroud)

DataFrame上不可用的方法.所有DataFrame方法仅引用DataFrame结果.那么如何从DataFrame数据创建RDD呢?

注意:这是从1.2.0开始的更改(在1.3.0中).

从@dpangmao的答案更新:方法是.rdd.我有兴趣了解(a)它是否公开以及(b)性能影响是什么.

那么(a)是肯定的和(b) - 你可以在这里看到有重要的性能影响:必须通过调用mapPartitions创建一个新的RDD :

dataframe.py中(注意文件名也改变了(是sql.py):

@property
def rdd(self):
    """
    Return the content of the :class:`DataFrame` as an :class:`RDD`
    of :class:`Row` s.
    """
    if not hasattr(self, '_lazy_rdd'):
        jrdd = self._jdf.javaToPython()
        rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
        schema = self.schema

        def applySchema(it):
            cls = _create_cls(schema)
            return itertools.imap(cls, it)

        self._lazy_rdd = rdd.mapPartitions(applySchema)

    return self._lazy_rdd
Run Code Online (Sandbox Code Playgroud)

dap*_*mao 92

使用这样的方法.rdd:

rdd = df.rdd
Run Code Online (Sandbox Code Playgroud)

  • 是但它转换为org.apache.spark.rdd.RDD [org.apache.spark.sql.Row]但不转换为org.apache.spark.rdd.RDD [string] (16认同)

ken*_*yut 59

@dapangmao的答案有效,但它不会给常规的火花RDD,它会返回一个Row对象.如果你想拥有常规的RDD格式.

试试这个:

rdd = df.rdd.map(tuple)
Run Code Online (Sandbox Code Playgroud)

要么

rdd = df.rdd.map(list)
Run Code Online (Sandbox Code Playgroud)

  • 调用`df.rdd`时,这应该是默认行为imo (3认同)

Nil*_*esh 5

kennyut/Kistian 给出的答案效果很好,但是当RDD 由属性列表(例如 [1,2,3,4])组成时,要获得像输出一样的准确 RDD,我们可以使用 flatmap 命令,如下所示,

rdd = df.rdd.flatMap(list)
or 
rdd = df.rdd.flatmap(lambda x: list(x))
Run Code Online (Sandbox Code Playgroud)