kar*_*son 6 python apache-spark apache-spark-sql pyspark
我正在使用pyspark(Apache Spark)的DataFrame API,并遇到以下问题:
当我连接两个源自相同源DataFrame的DataFrame时,生成的DF将爆炸到大量行.一个简单的例子:
我n从磁盘加载一个DataFrame :
df = sql_context.parquetFile('data.parquet')
Run Code Online (Sandbox Code Playgroud)
然后我从该源创建两个DataFrame.
df_one = df.select('col1', 'col2')
df_two = df.select('col1', 'col3')
Run Code Online (Sandbox Code Playgroud)
最后我想(内部)将它们重新组合在一起:
df_joined = df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner')
Run Code Online (Sandbox Code Playgroud)
关键col1是独特的.生成的DataFrame应该有n行,但它确实有n*n行.
做不到这一点,当我加载df_one并df_two直接从磁盘.我在Spark 1.3.0上,但这也发生在当前的1.4.0快照上.
谁能解释为什么会这样?
我在 Spark 1.3 的大型数据集中也看到了这个问题。不幸的是,在我编造的小而人为的例子中,“加入”工作正常。我觉得加入之前的步骤可能存在一些潜在的错误
执行连接(注意:DateTime 只是一个字符串):
> join = df1.join(df2, df1.DateTime == df2.DateTime, "inner")
> join.count()
250000L
Run Code Online (Sandbox Code Playgroud)
这显然返回了完整的 500*500 笛卡尔连接。
对我有用的是切换到 SQL:
> sqlc.registerDataFrameAsTable(df1, "df1")
> sqlc.registerDataFrameAsTable(df2, "df2")
> join = sqlc.sql("select * from df1, df2 where df1.DateTime = df2.DateTime")
> join.count()
471L
Run Code Online (Sandbox Code Playgroud)
这个值看起来是正确的。
看到这一点,我个人不会使用 pyspark 的 DataFrame.join() ,直到我能更好地理解这种差异。
| 归档时间: |
|
| 查看次数: |
10640 次 |
| 最近记录: |