Mar*_*nne 11 scala apache-spark apache-spark-sql
是否可以向DataFrames 添加额外的元数据?
我有Spark DataFrame,我需要保留额外的信息.示例:A DataFrame,我想要"记住"Integer id列中使用率最高的索引.
我使用单独的DataFrame来存储这些信息.当然,单独保存这些信息是单调乏味且容易出错的.
有没有更好的解决方案来存储这样的额外信息DataFrame?
chb*_*own 12
要扩展和Scala-fy nealmcb的答案(问题标记为scala,而不是python,所以我认为这个答案不会是主题或冗余),假设您有一个DataFrame:
import org.apache.spark.sql
val df = sc.parallelize(Seq.fill(100) { scala.util.Random.nextInt() }).toDF("randInt")
Run Code Online (Sandbox Code Playgroud)
还有一些方法可以在DataFrame上获得最大值或任何想要记忆的内容:
val randIntMax = df.rdd.map { case sql.Row(randInt: Int) => randInt }.reduce(math.max)
Run Code Online (Sandbox Code Playgroud)
sql.types.Metadata只能包含字符串,布尔值,某些类型的数字和其他元数据结构.所以我们必须使用Long:
val metadata = new sql.types.MetadataBuilder().putLong("columnMax", randIntMax).build()
Run Code Online (Sandbox Code Playgroud)
DataFrame.withColumn()实际上有一个重载,允许在最后提供元数据参数,但它被莫名其妙地标记为[private],所以我们只是做它做的 - 使用Column.as(alias, metadata):
val newColumn = df.col("randInt").as("randInt_withMax", metadata)
val dfWithMax = df.withColumn("randInt_withMax", newColumn)
Run Code Online (Sandbox Code Playgroud)
dfWithMax 现在有(一列)您想要的元数据!
dfWithMax.schema.foreach(field => println(s"${field.name}: metadata=${field.metadata}"))
> randInt: metadata={}
> randInt_withMax: metadata={"columnMax":2094414111}
Run Code Online (Sandbox Code Playgroud)
或者以编程方式和类型安全(排序; Metadata.getLong()和其他人不返回Option并且可能抛出"未找到密钥"异常):
dfWithMax.schema("randInt_withMax").metadata.getLong("columnMax")
> res29: Long = 209341992
Run Code Online (Sandbox Code Playgroud)
在您的情况下将max附加到列是有意义的,但是在将元数据附加到DataFrame而不是特定列的一般情况下,看起来您必须采用其他答案描述的包装器路由.
从Spark 1.2开始,StructType模式具有一个metadata属性,该属性可以保存Dataframe中每个Column的任意映射/信息字典.例如(当与单独的spark-csv库一起使用时):
customSchema = StructType([
StructField("cat_id", IntegerType(), True,
{'description': "Unique id, primary key"}),
StructField("cat_title", StringType(), True,
{'description': "Name of the category, with underscores"}) ])
categoryDumpDF = (sqlContext.read.format('com.databricks.spark.csv')
.options(header='false')
.load(csvFilename, schema = customSchema) )
f = categoryDumpDF.schema.fields
["%s (%s): %s" % (t.name, t.dataType, t.metadata) for t in f]
["cat_id (IntegerType): {u'description': u'Unique id, primary key'}",
"cat_title (StringType): {u'description': u'Name of the category, with underscores.'}"]
Run Code Online (Sandbox Code Playgroud)
这是在添加[火花3569]元数据字段添加到StructField - ASF JIRA,并且设计用于在机器学习管道使用来跟踪关于存储在列中的功能,如分类/连续的,数量的类别信息,类别对索引图.请参阅SPARK-3569:向StructField设计文档添加元数据字段.
我想更广泛地使用它,例如列的描述和文档,列中使用的测量单位,坐标轴信息等.
问题包括如何在转换列时适当地保留或操作元数据信息,如何处理多种元数据,如何使其全部可扩展等.
为了那些考虑在Spark数据帧中扩展此功能的人的利益,我引用了一些关于Pandas的类似讨论.
例如,请参阅xray - 将pandas的标记数据功能引入支持标记数组元数据的物理科学.
请参阅允许将自定义元数据附加到面板/ df /系列的 Pandas元数据的讨论?·问题#2485·pydata/pandas.
另见与单位有关的讨论:ENH:计量单位/物理量·问题#10349·pydata/pandas
小智 2
我会在你的数据框周围存储一个包装器。例如:
case class MyDFWrapper(dataFrame: DataFrame, metadata: Map[String, Long])
val maxIndex = df1.agg("index" ->"MAX").head.getLong(0)
MyDFWrapper(df1, Map("maxIndex" -> maxIndex))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
9791 次 |
| 最近记录: |