如何在Pyspark中定义空数据框并附加相应的数据帧?

Gau*_*wla 3 pyspark pyspark-sql

所以我想从目录中读取csv文件,作为pyspark数据帧,然后将它们附加到单个数据帧中.在pyspark中没有得到替代品,就像我们在熊猫中所做的那样.

例如在Pandas,我们做:

files=glob.glob(path +'*.csv')

df=pd.DataFrame() 

for f in files:
    dff=pd.read_csv(f,delimiter=',')
    df.append(dff)
Run Code Online (Sandbox Code Playgroud)

在Pyspark,我试过这个,但没有成功

schema=StructType([])
union_df = sqlContext.createDataFrame(sc.emptyRDD(),schema)

for f in files:
    dff = sqlContext.read.load(f,format='com.databricks.spark.csv',header='true',inferSchema='true',delimiter=',')
    df=df.union_All(dff)
Run Code Online (Sandbox Code Playgroud)

非常感谢任何帮助.

谢谢

Nim*_*m J 9

在spark 2.1中完成此操作的一种方法:

files=glob.glob(path +'*.csv')

for idx,f in enumerate(files):
    if idx == 0:
        df = spark.read.csv(f,header=True,inferSchema=True)
        dff = df
    else:
        df = spark.read.csv(f,header=True,inferSchema=True)
        dff=dff.unionAll(df)
Run Code Online (Sandbox Code Playgroud)


小智 7

首先定义模式,然后您可以使用 unionAll 将新数据帧连接到空数据帧,甚至运行迭代将一堆数据帧组合在一起。

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

sc = SparkContext(conf=SparkConf())
spark = SparkSession(sc)     # Need to use SparkSession(sc) to createDataFrame

schema = StructType([
    StructField("column1",StringType(),True),
    StructField("column2",StringType(),True)
])
empty = spark.createDataFrame(sc.emptyRDD(), schema)

empty = empty.unionAll(addOndata)
Run Code Online (Sandbox Code Playgroud)


小智 6

在 2 个数据帧上使用“unionAll”时,架构应该相同。因此,空数据框的架构应该与 csv 架构一致。

例如:

schema = StructType([
    StructField("v1", LongType(), True), StructField("v2", StringType(), False), StructField("v3", StringType(), False)
])
df = sqlContext.createDataFrame([],schema)
Run Code Online (Sandbox Code Playgroud)

或者你可以这样做:

f = files.pop(0)
df = sqlContext.read.load(f,format='com.databricks.spark.csv',header='true',inferSchema='true',delimiter=',')
for f in files:
    dff = sqlContext.read.load(f,format='com.databricks.spark.csv',header='true',inferSchema='true',delimiter=',')
    df=df.union_All(dff)
Run Code Online (Sandbox Code Playgroud)


ris*_*137 5

你可以在这里使用一个空的 DataFrame。创建一个空列表并继续向其中添加子 DataFrame。添加完要合并的所有 DataFrame 后,使用 union 对列表进行归约,它将把所有这些 DataFrame 合并到一个 DataFrame 中。

list_of_dfs = []
for i in number_of_dfs:
    list_of_dfs.append(df_i)
combined_df = reduce(DataFrame.union, list_of_dfs)
Run Code Online (Sandbox Code Playgroud)