Spark Dataframe验证镶木地板写入(scala)的列名称

cod*_*mer 7 apache-spark parquet spark-streaming apache-spark-sql spark-dataframe

我正在使用从JSON事件流转换而来的Dataframes来处理事件,这些事件最终会像Parquet格式一样被写出来.

但是,一些JSON事件在键中包含空格,我想在将它转换为Parquet之前从数据框中记录和过滤/删除这些事件,因为,; {}()\n\t =被认为是Parquet中的特殊字符schema(CatalystSchemaConverter)如下面[1]中所列,因此不应在列名中使用.

如何在Dataframe上对列名进行此类验证,并完全删除此类事件,而不会错误输出Spark Streaming作业.

[1] Spark的CatalystSchemaConverter

def checkFieldName(name: String): Unit = {
    // ,;{}()\n\t= and space are special characters in Parquet schema
    checkConversionRequirement(
      !name.matches(".*[ ,;{}()\n\t=].*"),
      s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
         |Please use alias to rename it.
       """.stripMargin.split("\n").mkString(" ").trim)
  }
Run Code Online (Sandbox Code Playgroud)

Jan*_*fer 12

对于在pyspark 中遇到此问题的每个人:在重命名列后,这甚至发生在我身上。经过一些迭代后,我可以让它工作的一种方法是:

file = "/opt/myfile.parquet"
df = spark.read.parquet(file)
for c in df.columns:
    df = df.withColumnRenamed(c, c.replace(" ", ""))

df = spark.read.schema(df.schema).parquet(file)

Run Code Online (Sandbox Code Playgroud)

  • 这对我有用,但该列的内容为空。.... (5认同)

bla*_*hop 7

在写入 parquet 之前,您可以使用正则表达式用下划线替换所有无效字符。此外,也要从列名中去除重音符号。

这是一个normalize为 Scala 和 Python 执行此操作的函数:

斯卡拉

/**
  * Normalize column name by replacing invalid characters with underscore
  * and strips accents
  *
  * @param columns dataframe column names list
  * @return the list of normalized column names
  */
def normalize(columns: Seq[String]): Seq[String] = {
  columns.map { c =>
    org.apache.commons.lang3.StringUtils.stripAccents(c.replaceAll("[ ,;{}()\n\t=]+", "_"))
  }
}

// using the function
val df2 = df.toDF(normalize(df.columns):_*)
Run Code Online (Sandbox Code Playgroud)

Python

import unicodedata
import re

def normalize(column: str) -> str:
    """
    Normalize column name by replacing invalid characters with underscore
    strips accents and make lowercase
    :param column: column name
    :return: normalized column name
    """
    n = re.sub(r"[ ,;{}()\n\t=]+", '_', column.lower())
    return unicodedata.normalize('NFKD', n).encode('ASCII', 'ignore').decode()


# using the function
df = df.toDF(*map(normalize, df.columns))

Run Code Online (Sandbox Code Playgroud)


小智 1

用于alias更改不带这些特殊字符的字段名称。