如何检测Spark DataFrame是否具有列

ben*_*ben 39 scala dataframe apache-spark apache-spark-sql

当我DataFrame在Spark SQL中创建一个JSON文件时,如何在调用之前判断给定列是否存在.select

示例JSON模式:

{
  "a": {
    "b": 1,
    "c": 2
  }
}
Run Code Online (Sandbox Code Playgroud)

这就是我想要做的:

potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))
Run Code Online (Sandbox Code Playgroud)

但我找不到一个好的功能hasColumn.我得到的最接近的是测试列是否在这个有点笨拙的数组中:

scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)
Run Code Online (Sandbox Code Playgroud)

zer*_*323 75

假设它存在并让它失败Try.简单明了,支持任意嵌套:

import scala.util.Try
import org.apache.spark.sql.DataFrame

def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess

val df = sqlContext.read.json(sc.parallelize(
  """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil))

hasColumn(df, "foobar")
// Boolean = false

hasColumn(df, "foo")
// Boolean = true

hasColumn(df, "foo.bar")
// Boolean = true

hasColumn(df, "foo.bar.foobar")
// Boolean = true

hasColumn(df, "foo.bar.foobaz")
// Boolean = false
Run Code Online (Sandbox Code Playgroud)

甚至更简单:

val columns = Seq(
  "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz")

columns.flatMap(c => Try(df(c)).toOption)
// Seq[org.apache.spark.sql.Column] = List(
//   foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)
Run Code Online (Sandbox Code Playgroud)

Python等价物:

from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row


def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF()

has_column(df, "foobar")
## False

has_column(df, "foo")
## True

has_column(df, "foo.bar")
## True

has_column(df, "foo.bar.foobar")
## True

has_column(df, "foo.bar.foobaz")
## False
Run Code Online (Sandbox Code Playgroud)

  • 这也适用于结构化字段。使用 `contains` 函数的解决方案没有!+1 (2认同)

Jai*_*ash 34

我通常使用的另一种选择是

df.columns.contains("column-name-to-check")
Run Code Online (Sandbox Code Playgroud)

这返回一个布尔值

  • 不适用于嵌套列. (5认同)
  • 这是部分不正确的,因为 df.columns 的输出是一个列表 `type(df.columns)` 返回 `<class 'list'>` 并且 list 没有 'contains' 属性。df.columns 中的“col_to_check”没问题 (5认同)
  • 尝试:: df.columns.__contains__("要检查的列名") (2认同)

Dan*_* B. 13

实际上你甚至不需要调用select来使用列,你可以在数据帧本身上调用它

// define test data
case class Test(a: Int, b: Int)
val testList = List(Test(1,2), Test(3,4))
val testDF = sqlContext.createDataFrame(testList)

// define the hasColumn function
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName)

// then you can just use it on the DF with a given column name
hasColumn(testDF, "a")  // <-- true
hasColumn(testDF, "c")  // <-- false
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用pimp my library模式定义隐式类,以便hasColumn方法直接在数据框上可用

implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) {
    def hasColumn(colName: String) = df.columns.contains(colName)
}
Run Code Online (Sandbox Code Playgroud)

然后你可以用它作为:

testDF.hasColumn("a") // <-- true
testDF.hasColumn("c") // <-- false
Run Code Online (Sandbox Code Playgroud)

  • 这不适用于嵌套列.来自json`{"a":{"b":1,"c":0}}` (5认同)

小智 9

Try不是最优的,因为它会Try在做出决定之前评估内部的表达式。

对于大型数据集,请在以下内容中使用Scala

df.schema.fieldNames.contains("column_name")
Run Code Online (Sandbox Code Playgroud)


mef*_*yar 8

对于那些在寻找 Python 解决方案时偶然发现这个问题的人,我使用:

if 'column_name_to_check' in df.columns:
    # do something
Run Code Online (Sandbox Code Playgroud)

当我尝试 @Jai Prakash 使用 Python 的答案时df.columns.contains('column-name-to-check'),我得到了AttributeError: 'list' object has no attribute 'contains'.