men*_*h84 22 python apache-spark apache-spark-sql pyspark
我正在使用Spark 1.3.0和Python.我有一个数据框,我希望添加一个从其他列派生的附加列.像这样,
>>old_df.columns
[col_1, col_2, ..., col_m]
>>new_df.columns
[col_1, col_2, ..., col_m, col_n]
Run Code Online (Sandbox Code Playgroud)
哪里
col_n = col_3 - col_4
Run Code Online (Sandbox Code Playgroud)
我如何在PySpark中执行此操作?
zer*_*323 32
实现这一目标的一种方法是使用withColumn方法:
old_df = sqlContext.createDataFrame(sc.parallelize(
[(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2'))
new_df = old_df.withColumn('col_n', old_df.col_1 - old_df.col_2)
Run Code Online (Sandbox Code Playgroud)
或者,您可以在已注册的表上使用SQL:
old_df.registerTempTable('old_df')
new_df = sqlContext.sql('SELECT *, col_1 - col_2 AS col_n FROM old_df')
Run Code Online (Sandbox Code Playgroud)
Myk*_*tko 12
您可以通过以下方式添加新列:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([[1, 2], [3, 4]], ['col1', 'col2'])
df.show()
+----+----+
|col1|col2|
+----+----+
| 1| 2|
| 3| 4|
+----+----+
Run Code Online (Sandbox Code Playgroud)
-- 使用方法withColumn:
import pyspark.sql.functions as F
df.withColumn('col3', F.col('col2') - F.col('col1')) # col function
df.withColumn('col3', df['col2'] - df['col1']) # bracket notation
df.withColumn('col3', df.col2 - df.col1) # dot notation
Run Code Online (Sandbox Code Playgroud)
-- 使用方法select:
df.select('*', (F.col('col2') - F.col('col1')).alias('col3'))
Run Code Online (Sandbox Code Playgroud)
该表达式'*'返回所有列。
-- 使用方法selectExpr:
df.selectExpr('*', 'col2 - col1 as col3')
Run Code Online (Sandbox Code Playgroud)
-- 使用 SQL:
df.createOrReplaceTempView('df_view')
spark.sql('select *, col2 - col1 as col3 from df_view')
Run Code Online (Sandbox Code Playgroud)
结果:
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 2| 1|
| 3| 4| 1|
+----+----+----+
Run Code Online (Sandbox Code Playgroud)
另外,我们可以使用udf
from pyspark.sql.functions import udf,col
from pyspark.sql.types import IntegerType
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
old_df = sqlContext.createDataFrame(sc.parallelize(
[(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2'))
function = udf(lambda col1, col2 : col1-col2, IntegerType())
new_df = old_df.withColumn('col_n',function(col('col_1'), col('col_2')))
new_df.show()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
34303 次 |
| 最近记录: |