kar*_*oma 5 concurrency scala apache-spark
我正在使用Spark来获取包含一列日期的数据框,并创建3个新列,其中包含列中日期和今天之间的天,周和月的时间.
我关心的是使用SimpleDateFormat,它不是线程安全的.通常没有Spark这可以,因为它是一个局部变量,但是使用Spark的懒惰评估,在多个UDF上共享一个SimpleDateFormat实例可能会导致问题?
def calcTimeDifference(...){
val sdf = new SimpleDateFormat(dateFormat)
val dayDifference = udf{(x: String) => math.abs(Days.daysBetween(new DateTime(sdf.parse(x)), presentDate).getDays)}
output = output.withColumn("days", dayDifference(myCol))
val weekDifference = udf{(x: String) => math.abs(Weeks.weeksBetween(new DateTime(sdf.parse(x)), presentDate).getWeeks)}
output = output.withColumn("weeks", weekDifference(myCol))
val monthDifference = udf{(x: String) => math.abs(Months.monthsBetween(new DateTime(sdf.parse(x)), presentDate).getMonths)}
output = output.withColumn("months", monthDifference(myCol))
}
Run Code Online (Sandbox Code Playgroud)
小智 0
我认为它不安全,正如我们所知,SimpleDateFormat不是线程安全的。
因此,如果您需要,我更喜欢使用Spark 中的SimpleDateFormat这种方法:
import java.text.SimpleDateFormat
import java.util.SimpleTimeZone
/**
* Thread Safe SimpleDateFormat for Spark.
*/
object ThreadSafeFormat extends ThreadLocal[SimpleDateFormat] {
override def initialValue(): SimpleDateFormat = {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd:H")
// if you need get UTC time, you can set UTC timezone
val utcTimeZone = new SimpleTimeZone(SimpleTimeZone.UTC_TIME, "UTC")
dateFormat.setTimeZone(utcTimeZone)
dateFormat
}
}
Run Code Online (Sandbox Code Playgroud)
然后使用ThreadSafeFormat.get()获取线程安全的 SimpleDateFormat 来执行任何操作。