将值分配给PySpark dataFrame中的特定单元格

NuV*_*lue 1 python dataframe apache-spark pyspark

我想在我的特定细胞更改值Spark DataFrame使用PySpark

琐碎的例子-我创建一个模拟Spark DataFrame

df = spark.createDataFrame(
    [
     (1, 1.87, 'new_york'), 
     (4, 2.76, 'la'), 
     (6, 3.3, 'boston'), 
     (8, 4.1, 'detroit'), 
     (2, 5.70, 'miami'), 
     (3, 6.320, 'atlanta'), 
     (1, 6.1, 'houston')
    ],
    ('variable_1', "variable_2", "variable_3")
)
Run Code Online (Sandbox Code Playgroud)

运行display(df)我得到此表:

variable_1   variable_2   variable_3
    1           1.87    new_york
    4           2.76    la
    6           3.3     boston
    8           4.1     detroit
    2           5.7     miami
    3           6.32    atlanta
    1           6.1     houston
Run Code Online (Sandbox Code Playgroud)

Let's说,例如,我想分配为第4行和第3列的单元格的新值,即改变detroitnew_orleans。我知道作业在中有效df.iloc[4, 3] = 'new_orleans'df.loc[4, 'detroit'] = 'new_orleans'无效Spark

使用此问题的有效答案when是:

from pyspark.sql.functions import when
targetDf = df.withColumn("variable_3", \
              when(((df["variable_1"] == 8) & (df["variable_2"] == 4.1)) , 'new_orleans').otherwise(df["variable_3"]))
Run Code Online (Sandbox Code Playgroud)

我的问题是:是否可以通过更实际的方式完成此操作,PySpark而不必输入我只想更改1个单个单元格的行的所有值和列名(也许不使用该when函数就可以实现相同的功能)?

在此先感谢您的帮助,并感谢@ useruser9806664的反馈。

use*_*664 5

Spark DataFrames不可变的不提供随机访问权限,严格来说,它是无序的。结果是:

  • 您不能分配任何内容(因为属性是不可变的)。
  • 您无法访问特定的行(因为没有随机访问权限)。
  • 行“ indcies”的定义不明确(因为无序)。

您可以做的是使用新的列创建一个新的数据框,使用一些条件表达式替换现有的数据框,该条件表达式已包含在您找到的答案中。

另外,monotonically_increasing_id不添加索引(行号)。它添加单调递增的数字,不一定是连续的数字或从任何特定值开始(如果是空分区)。


Gui*_*cia 5

考虑使用 Pandas DataFrame

Spark DataFrame 确实是不可变的,因此它们不适合修改。Spark Dataframe 是一种分布式数据集合,针对处理大量数据进行了优化,如果您想要进行任何更改,则必须根据需要的修改创建一个新数据帧。

然而,有时您可能需要修改特定行的特定单元格。对于这些情况,您可以使用when函数(就像您在示例中所做的那样)修改列,并将单元格的值与您要修改的特定单元格位于同一行。或者,您可以考虑将 Spark Dataframe 转换为 Pandas DataFrame (可变),并在将新值分配给相关单元格后,将其转换回 Spark DataFrame。您可以这样做:

# Copy the schema of your Spark dataframe 
schema = df.schema

# Create Pandas Dataframe using your Spark DataFrame
pandas_df = df.toPandas()

# Assign the new value to the specific cell (you could use .at or .loc)
pandas_df.at[3, 'variable_3'] = 'new_orleans'

# Update your dataframe with the new value using the Pandas DataFrame
df = spark.createDataFrame(pandas_df,schema=schema)

# Delete the auxiliary pandas dataframe to free memory for other uses
del pandas_df
Run Code Online (Sandbox Code Playgroud)

请记住,Pandas DataFrame 不是分布式的,对于大量数据,Pandas DataFrame 中的处理速度会较慢。