Pandas 到 PySpark:将一列元组列表转换为每个元组项的单独列

iva*_*lan 5 python dataframe pandas apache-spark-sql pyspark

我需要转换一个 DataFrame,其中一列包含一个元组列表,每个元组中的每个项目都必须是一个单独的列。

这是 Pandas 中的一个示例和解决方案:

import pandas as pd

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame.from_dict(df_dict)
print(df)  # intial structure

           a    d
    1   stuff   [(1, 2), (3, 4)]
    2   stuff2  [(1, 2), (3, 4)]

# first transformation, let's separate each list item into a new row
row_breakdown = df.set_index(["a"])["d"].apply(pd.Series).stack()
print(row_breakdown)

            a        
    stuff   0    (1, 2)
            1    (3, 4)
    stuff2  0    (1, 2)
            1    (3, 4)
    dtype: object

row_breakdown = row_breakdown.reset_index().drop(columns=["level_1"])
print(row_breakdown)

    a   0
    0   stuff   (1, 2)
    1   stuff   (3, 4)
    2   stuff2  (1, 2)
    3   stuff2  (3, 4)

# second transformation, let's get each tuple item into a separate column
row_breakdown.columns = ["a", "d"]
row_breakdown = row_breakdown["d"].apply(pd.Series)
row_breakdown.columns = ["value_1", "value_2"]
print(row_breakdown)

        value_1 value_2
    0   1   2
    1   3   4
    2   1   2
    3   3   4
Run Code Online (Sandbox Code Playgroud)

这是熊猫解决方案。我需要能够做同样的事情,但使用 PySpark (2.3)。我已经开始研究它,但立即卡住了:

from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

spark = SparkSession(sc)

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame(df_dict)
ddf = spark.createDataFrame(df)

row_breakdown = ddf.set_index(["a"])["d"].apply(pd.Series).stack()

    AttributeError: 'DataFrame' object has no attribute 'set_index'
Run Code Online (Sandbox Code Playgroud)

显然,Spark 不支持索引。任何指针表示赞赏。

mar*_*oyo 2

这可能会做:

from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F
import pandas as pd

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

spark = SparkSession(sc)

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame(df_dict)
ddf = spark.createDataFrame(df)


exploded = ddf.withColumn('d', F.explode("d"))
exploded.show()
Run Code Online (Sandbox Code Playgroud)

结果:

+------+------+
|     a|     d|
+------+------+
| stuff|[1, 2]|
| stuff|[3, 4]|
|stuff2|[1, 2]|
|stuff2|[3, 4]|
+------+------+
Run Code Online (Sandbox Code Playgroud)

我觉得使用 SQL 更舒服:

exploded.createOrReplaceTempView("exploded")
spark.sql("SELECT a, d._1 as value_1, d._2 as value_2 FROM exploded").show()
Run Code Online (Sandbox Code Playgroud)

重要提示:之所以使用_1and_2访问器,是因为 Spark 将元组解析为结构并为其提供了默认键。如果在您的实际实现中,数据帧包含array<int>,您应该使用该[0]语法。

最终结果是:

+------+-------+-------+
|     a|value_1|value_2|
+------+-------+-------+
| stuff|      1|      2|
| stuff|      3|      4|
|stuff2|      1|      2|
|stuff2|      3|      4|
+------+-------+-------+
Run Code Online (Sandbox Code Playgroud)