如在Web上的许多 其他位置所述,向现有DataFrame添加新列并不简单.不幸的是,拥有此功能非常重要(即使它在分布式环境中效率低下),尤其是在尝试连接两个DataFrames时unionAll.
将null列添加到a DataFrame以便于实现最优雅的解决方法是unionAll什么?
我的版本是这样的:
from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
to_none = UserDefinedFunction(lambda x: None, StringType())
new_df = old_df.withColumn('new_column', to_none(df_old['any_col_from_old']))
Run Code Online (Sandbox Code Playgroud) 作为一个简化示例,我有一个数据框"df",其列为"col1,col2",我想在将函数应用于每列后计算行的最大值:
def f(x):
return (x+1)
max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())
df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))
Run Code Online (Sandbox Code Playgroud)
所以如果df:
col1 col2
1 2
3 0
Run Code Online (Sandbox Code Playgroud)
然后
DF2:
col1 col2 result
1 2 3
3 0 4
Run Code Online (Sandbox Code Playgroud)
以上似乎不起作用并产生"无法评估表达式:PythonUDF#f ......"
我绝对肯定"f_udf"在我的桌子上运行得很好,主要问题在于max_udf.
如果不创建额外的列或使用基本的map/reduce,有没有办法完全使用数据帧和udfs?我该如何修改"max_udf"?
我也尝试过:
max_udf=udf(max, IntegerType())
Run Code Online (Sandbox Code Playgroud)
这会产生相同的错误.
我还确认以下工作:
df2=(df.withColumn("temp1", f_udf(df.col1))
.withColumn("temp2", f_udf(df.col2))
df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))
Run Code Online (Sandbox Code Playgroud)
为什么我不能一气呵成呢?
我希望看到一个可以概括为任何函数"f_udf"和"max_udf"的答案.
我是一个火花应用程序,有几点我想坚持当前的状态.这通常是在一大步之后,或缓存我想要多次使用的状态.看来,当我第二次在我的数据帧上调用缓存时,新副本会缓存到内存中.在我的应用程序中,这会在扩展时导致内存问题.即使在我当前的测试中,给定的数据帧最大约为100 MB,中间结果的累积大小也会超出执行程序的分配内存.请参阅下面的一个显示此行为的小示例.
cache_test.py:
from pyspark import SparkContext, HiveContext
spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)
df = (hive_context.read
.format('com.databricks.spark.csv')
.load('simple_data.csv')
)
df.cache()
df.show()
df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()
spark_context.stop()
Run Code Online (Sandbox Code Playgroud)
simple_data.csv:
1,2,3
4,5,6
7,8,9
Run Code Online (Sandbox Code Playgroud)
查看应用程序UI,有一个原始数据框的副本,与新列的副本相对应.我可以通过df.unpersist()在withColumn行之前调用来删除原始副本.这是删除缓存中间结果的推荐方法(即在每次调用之前调用unpersist cache()).
此外,是否可以清除所有缓存的对象.在我的应用程序中,有一些自然断点,我可以简单地清除所有内存,然后转到下一个文件.我想这样做而不为每个输入文件创建一个新的spark应用程序.
先感谢您!
我想使用spark withColumnRenamed函数更改两列的名称.当然,我可以写:
data = sqlContext.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
.withColumnRenamed('x1','x3')
.withColumnRenamed('x2', 'x4'))
Run Code Online (Sandbox Code Playgroud)
但我想一步到位(有新名单的列表/元组).不幸的是,这不是:
data = data.withColumnRenamed(['x1', 'x2'], ['x3', 'x4'])
Run Code Online (Sandbox Code Playgroud)
也不是这样
data = data.withColumnRenamed(('x1', 'x2'), ('x3', 'x4'))
Run Code Online (Sandbox Code Playgroud)
工作中.有可能这样做吗?
我最近在 Spark 中遇到了一些奇怪的事情。据我了解,鉴于spark dfs的基于列的存储方法,列的顺序确实没有任何意义,它们就像字典中的键。
在 a 期间df.union(df2),列的顺序重要吗?我会假设它不应该,但根据 sql 论坛的智慧,它确实如此。
所以我们有 df1
df1
| a| b|
+---+----+
| 1| asd|
| 2|asda|
| 3| f1f|
+---+----+
df2
| b| a|
+----+---+
| asd| 1|
|asda| 2|
| f1f| 3|
+----+---+
result
| a| b|
+----+----+
| 1| asd|
| 2|asda|
| 3| f1f|
| asd| 1|
|asda| 2|
| f1f| 3|
+----+----+
Run Code Online (Sandbox Code Playgroud)
看起来使用了 df1 中的架构,但数据似乎已按照其原始数据帧的顺序加入。显然,解决方案是这样做df1.union(df2.select(df1.columns))
但主要问题是,它为什么要这样做?仅仅是因为它是 pyspark.sql 的一部分,还是 Spark 中有一些我在理解上搞砸了的底层数据架构?
如果有人想尝试创建测试集的代码
d1={'a':[1,2,3], 'b':['asd','asda','f1f']}
d2={ 'b':['asd','asda','f1f'], 'a':[1,2,3],}
pdf1=pd.DataFrame(d1)
pdf2=pd.DataFrame(d2) …Run Code Online (Sandbox Code Playgroud) 我有一个 pandas 数据框,我想将其转换为 Spark 数据框。通常,我使用下面的代码从 pandas 创建 Spark 数据框,但突然我开始收到以下错误,我知道 pandas 已删除 iteritems() 但我当前的 pandas 版本是 2.0.0 并且我尝试安装较小的版本并尝试创建 Spark df 但我仍然遇到相同的错误。该错误在 Spark 函数内部调用。解决这个问题的办法是什么?我应该安装哪个 pandas 版本才能创建 Spark df. 我还尝试更改集群数据块的运行时并尝试重新运行,但仍然遇到相同的错误。
import pandas as pd
spark.createDataFrame(pd.DataFrame({'i':[1,2,3],'j':[1,2,3]}))
error:-
UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
'DataFrame' object has no attribute 'iteritems'
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
warn(msg)
AttributeError: 'DataFrame' object has no attribute 'iteritems'
Run Code Online (Sandbox Code Playgroud) 我有以下格式的数据(RDD或Spark DataFrame):
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
# convert to a Spark DataFrame
schema = StructType([StructField('ID', StringType(), True),
StructField('Age', IntegerType(), True),
StructField('Country', StringType(), True),
StructField('Score', IntegerType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)
我想做的是'重塑'数据,将Country(特别是美国,英国和CA)中的某些行转换为列:
ID Age US UK CA
'X01' 41 3 1 2
'X02' 72 4 6 7
Run Code Online (Sandbox Code Playgroud)
从本质上讲,我需要Python的pivot工作流程:
categories = ['US', 'UK', 'CA']
new_df = df[df['Country'].isin(categories)].pivot(index = 'ID',
columns = 'Country',
values = 'Score')
Run Code Online (Sandbox Code Playgroud)
我的数据集相当大,所以我不能真正地collect()将数据摄取到内存中来进行Python本身的重塑.有没有办法 …
我有Spark DataFrame,带有(5)个顶行,如下所示:
[Row(date=datetime.datetime(1984, 1, 1, 0, 0), hour=1, value=638.55),
Row(date=datetime.datetime(1984, 1, 1, 0, 0), hour=2, value=638.55),
Row(date=datetime.datetime(1984, 1, 1, 0, 0), hour=3, value=638.55),
Row(date=datetime.datetime(1984, 1, 1, 0, 0), hour=4, value=638.55),
Row(date=datetime.datetime(1984, 1, 1, 0, 0), hour=5, value=638.55)]
Run Code Online (Sandbox Code Playgroud)
它的架构定义为:
elevDF.printSchema()
root
|-- date: timestamp (nullable = true)
|-- hour: long (nullable = true)
|-- value: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
如何从"日期"字段中获取年,月,日值?
我正在使用PySpark,我有一个带有一堆数字列的Spark数据帧.我想添加一个列,它是所有其他列的总和.
假设我的数据框有"a","b"和"c"列.我知道我可以这样做:
df.withColumn('total_col', df.a + df.b + df.c)
Run Code Online (Sandbox Code Playgroud)
问题是我不想单独输出每一列并添加它们,特别是如果我有很多列.我希望能够自动执行此操作,或者通过指定要添加的列名列表.还有另一种方法吗?
我使用CassandraSQLContextspark-shell来查询来自Cassandra的数据.所以,我想知道两个方面,一个是如何获取超过20行CassandraSQLContext,第二个是如何Id显示列的完整值.正如您在默认情况下可以看到的那样,它会在字符串值中附加点.
代码:
val csc = new CassandraSQLContext(sc)
csc.setKeyspace("KeySpace")
val maxDF = csc.sql("SQL_QUERY" )
maxDF.show
Run Code Online (Sandbox Code Playgroud)
输出:
+--------------------+--------------------+-----------------+--------------------+
| id| Col2| Col3| Col4|
+--------------------+--------------------+-----------------+--------------------+
|8wzloRMrGpf8Q3bbk...| Value1| X| K1|
|AxRfoHDjV1Fk18OqS...| Value2| Y| K2|
|FpMVRlaHsEOcHyDgy...| Value3| Z| K3|
|HERt8eFLRtKkiZndy...| Value4| U| K4|
|nWOcbbbm8ZOjUSNfY...| Value5| V| K5|
Run Code Online (Sandbox Code Playgroud) apache-spark ×10
pyspark ×10
python ×7
caching ×1
databricks ×1
dataframe ×1
pandas ×1
pivot ×1
pyspark-sql ×1
rename ×1
scala ×1
timestamp ×1