我需要在DataFrame上定义自定义方法.有什么更好的方法呢?解决方案应该是可扩展的,因为我打算定义大量的自定义方法.
我当前的方法是MyClass使用DataFrameas参数创建一个类(比如说),在其中定义我的自定义方法(比如说customMethod)并定义一个转换DataFrame为的隐式方法MyClass.
implicit def dataFrametoMyClass(df: DataFrame): MyClass = new MyClass(df)
Run Code Online (Sandbox Code Playgroud)
因此,我可以打电话:
dataFrame.customMethod()
Run Code Online (Sandbox Code Playgroud)
这是正确的方法吗?公开征求意见.
我在不可为空的数据框中有一个 StructField。简单的例子:
import pyspark.sql.functions as F
from pyspark.sql.types import *
l = [('Alice', 1)]
df = sqlContext.createDataFrame(l, ['name', 'age'])
df = df.withColumn('foo', F.when(df['name'].isNull(),False).otherwise(True))
df.schema.fields
Run Code Online (Sandbox Code Playgroud)
返回:
[StructField(name,StringType,true), StructField(age,LongType,true), StructField(foo,BooleanType,false)]
请注意,该字段foo不可为空。问题是(出于我不会讨论的原因)我希望它可以为空。我发现这篇文章Change nullable property of column in spark dataframe建议了一种方法,所以我将其中的代码调整为:
import pyspark.sql.functions as F
from pyspark.sql.types import *
l = [('Alice', 1)]
df = sqlContext.createDataFrame(l, ['name', 'age'])
df = df.withColumn('foo', F.when(df['name'].isNull(),False).otherwise(True))
df.schema.fields
newSchema = [StructField('name',StringType(),True), StructField('age',LongType(),True),StructField('foo',BooleanType(),False)]
df2 = sqlContext.createDataFrame(df.rdd, newSchema)
Run Code Online (Sandbox Code Playgroud)
失败了:
TypeError: StructField(name,StringType,true) 不是 JSON 可序列化的
我也在堆栈跟踪中看到了这一点:
raise ValueError("检测到循环引用")
所以我有点卡住了。任何人都可以修改这个例子,使我能够定义一个列可以 …
在 Spark 中,文字列在添加后不可为空:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([(1,)], ['c1'])
df = df.withColumn('c2', F.lit('a'))
df.printSchema()
# root
# |-- c1: long (nullable = true)
# |-- c2: string (nullable = false)
Run Code Online (Sandbox Code Playgroud)
如何创建可为空的列?
我正在尝试使用存储在架构注册表中的 Avro 架构将数据从 Spark 数据帧保存到 HDFS。但是,我在写入数据时遇到错误:
Caused by: org.apache.avro.AvroRuntimeException: Not a union: {"type":"long","logicalType":"timestamp-millis"}
at org.apache.avro.Schema.getTypes(Schema.java:299)
at org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$resolveNullableType(AvroSerializer.scala:229)
at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:209)
at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:208)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:208)
at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:51)
at org.apache.spark.sql.avro.AvroOutputWriter.serializer$lzycompute(AvroOutputWriter.scala:42)
at org.apache.spark.sql.avro.AvroOutputWriter.serializer(AvroOutputWriter.scala:42)
at org.apache.spark.sql.avro.AvroOutputWriter.write(AvroOutputWriter.scala:64)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
Run Code Online (Sandbox Code Playgroud)
可能是什么原因?
Avro 架构中的字段如下所示:
{"name":"CreateDate","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null}
Run Code Online (Sandbox Code Playgroud)
以下是日期格式的示例:
1900-01-01 00:00:00
Run Code Online (Sandbox Code Playgroud)
Spark dataframe中该字段的数据类型:
|-- CreateDate: timestamp (nullable = true)
Run Code Online (Sandbox Code Playgroud)
这是我写入数据的方式:
dataDF.write
.mode("append")
.format("avro")
.option(
"avroSchema",
SchemaRegistry.getSchema(
schemaRegistryConfig.url,
schemaRegistryConfig.dataSchemaSubject,
schemaRegistryConfig.dataSchemaVersion))
.save(hdfsURL)
Run Code Online (Sandbox Code Playgroud)