手动创建一个 pyspark 数据框

Jos*_*osh 21 pyspark pyspark-dataframes

我正在尝试根据某些数据手动创建一个 pyspark 数据框:

row_in=[(1566429545575348),(40.353977),(-111.701859)]
rdd=sc.parallelize(row_in)
schema = StructType([StructField("time_epocs", DecimalType(),    True),StructField("lat", DecimalType(),True),StructField("long", DecimalType(),True)])
df_in_test=spark.createDataFrame(rdd,schema)
Run Code Online (Sandbox Code Playgroud)

当我尝试显示数据框时,这会出错,因此我不确定如何执行此操作。

但是,Spark 文档在这里对我来说似乎有点复杂,当我尝试按照这些说明进行操作时,我遇到了类似的错误。

有谁知道如何做到这一点?

Ste*_*ven 55

简单的数据框创建:

df = spark.createDataFrame(
    [
        (1, "foo"),  # create your data here, be consistent in the types.
        (2, "bar"),
    ],
    ["id", "label"]  # add your column names here
)

df.printSchema()
root
 |-- id: long (nullable = true)
 |-- label: string (nullable = true)

df.show()
+---+-----+                                                                     
| id|label|
+---+-----+
|  1|  foo|
|  2|  bar|
+---+-----+
Run Code Online (Sandbox Code Playgroud)

根据官方文档

  • 当 schema 是列名列表时,将从数据中推断出每列的类型。
  • 当schema是pyspark.sql.types.DataType或数据类型字符串时,它必须匹配真实数据
# Example with a datatype string
df = spark.createDataFrame(
    [
        (1, "foo"),  # Add your data here
        (2, "bar"),
    ],  
    "id int, label string",  # add column names and types here
)

# Example with pyspark.sql.types
from pyspark.sql import types as T
df = spark.createDataFrame(
    [
        (1, "foo"),  # Add your data here
        (2, "bar"),
    ],
    T.StructType(  # Define the whole schema within a StructType
        [
            T.StructField("id", T.IntegerType(), True),
            T.StructField("label", T.StringType(), True),
        ]
    ),
)


df.printSchema()
root
 |-- id: integer (nullable = true)  # id's type is forced at Int
 |-- label: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)


Pow*_*ers 7

createDataFrame这个答案演示了如何使用,create_df和来创建 PySpark DataFrame toDF

df = spark.createDataFrame([("joe", 34), ("luisa", 22)], ["first_name", "age"])

df.show()
Run Code Online (Sandbox Code Playgroud)
+----------+---+
|first_name|age|
+----------+---+
|       joe| 34|
|     luisa| 22|
+----------+---+
Run Code Online (Sandbox Code Playgroud)

您还可以传递createDataFrameRDD 和模式来更精确地构建 DataFrame:

+----------+---+
|first_name|age|
+----------+---+
|       joe| 34|
|     luisa| 22|
+----------+---+
Run Code Online (Sandbox Code Playgroud)
+-----+---+
| name|age|
+-----+---+
|Allie|  2|
| Sara| 33|
|Grace| 31|
+-----+---+
Run Code Online (Sandbox Code Playgroud)

create_df我的Quinn项目中的内容可以实现两全其美 - 它简洁且描述性充分:

from pyspark.sql import Row
from pyspark.sql.types import *

rdd = spark.sparkContext.parallelize([
    Row(name='Allie', age=2),
    Row(name='Sara', age=33),
    Row(name='Grace', age=31)])

schema = schema = StructType([
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), False)])

df = spark.createDataFrame(rdd, schema)

df.show()
Run Code Online (Sandbox Code Playgroud)
+----+----+
|name|blah|
+----+----+
|jose|   a|
|  li|   b|
| sam|   c|
+----+----+
Run Code Online (Sandbox Code Playgroud)

toDF与其他方法相比没有任何优势:

+-----+---+
| name|age|
+-----+---+
|Allie|  2|
| Sara| 33|
|Grace| 31|
+-----+---+
Run Code Online (Sandbox Code Playgroud)
+-----+---+
| name|age|
+-----+---+
|Allie|  2|
| Sara| 33|
|Grace| 31|
+-----+---+
Run Code Online (Sandbox Code Playgroud)


Jos*_*osh 6

详细说明/建立@Steven的回答:

field = [
    StructField("MULTIPLIER", FloatType(), True),
    StructField("DESCRIPTION", StringType(), True),
]
schema = StructType(field)
multiplier_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)
Run Code Online (Sandbox Code Playgroud)

将创建一个空白数据框。

我们现在可以简单地向其中添加一行:

l = [(2.3, "this is a sample description")]
rdd = sc.parallelize(l)
multiplier_df_temp = spark.createDataFrame(rdd, schema)
multiplier_df = wtp_multiplier_df.union(wtp_multiplier_df_temp)
Run Code Online (Sandbox Code Playgroud)

  • 应避免这种方法,因为它过于复杂。 (2认同)