标签: spark3

to_date 无法解析 Spark 3.0 中的日期

我正在尝试使用解析日期,to_date()但出现以下异常。

SparkUpgradeException: 由于 Spark 3.0 升级,您可能会得到不同的结果:Fail to parse '12/1/2010 8:26' in the new parser。您可以将 spark.sql.legacy.timeParserPolicy 设置为 LEGACY 以恢复 Spark 3.0 之前的行为,或者设置为 CORRECTED 并将其视为无效的日期时间字符串。

例外情况表明我应该使用旧版时间解析器,首先我不知道如何将其设置为旧版。

这是我的实现

dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/dd/yyyy"))
Run Code Online (Sandbox Code Playgroud)

我的日期采用以下格式

+--------------+
|   InvoiceDate|
+--------------+
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark spark3

13
推荐指数
4
解决办法
8907
查看次数

Spark 3.0 读取 json 文件比 Spark 2.4 慢得多

我有大量的 json 文件,Spark 可以在 36 秒内读取这些文件,但 Spark 3.0 需要将近 33 分钟才能读取相同的文件。仔细分析,看起来 Spark 3.0 选择的 DAG 与 Spark 2.0 不同。有谁知道发生了什么?Spark 3.0.0 是否有任何配置问题?

火花2.4

scala> spark.time(spark.read.json("/data/20200528"))
Time taken: 19691 ms
res61: org.apache.spark.sql.DataFrame = [created: bigint, id: string ... 5 more fields]

scala> spark.time(res61.count())
Time taken: 7113 ms
res64: Long = 2605349
Run Code Online (Sandbox Code Playgroud)

火花3.0

scala> spark.time(spark.read.json("/data/20200528"))
20/06/29 08:06:53 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Time taken: 849652 ms …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark java-11 spark3

6
推荐指数
1
解决办法
991
查看次数

Spark 3.0出现以下问题如何解决?无法创建托管表。关联位置已存在。;

在我的 Spark 工作中,我尝试覆盖结构化流的每个微批次中的一个表

batchDF.write.mode(SaveMode.Overwrite).saveAsTable("mytable")
Run Code Online (Sandbox Code Playgroud)

它产生了以下错误。

  Can not create the managed table('`mytable`'). The associated location('file:/home/ec2-user/environment/spark/spark-local/spark-warehouse/mytable') already exists.;

Run Code Online (Sandbox Code Playgroud)

我知道在 Spark 2.xx 中,解决这个问题的方法是添加以下选项。

spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")
Run Code Online (Sandbox Code Playgroud)

它在 Spark 2.xx 中运行良好。不过,这个选项在 Spark 3.0.0 中被删除了。那么,在Spark 3.0.0中我们应该如何解决这个问题呢?

谢谢!

apache-spark spark-streaming spark3

6
推荐指数
1
解决办法
7850
查看次数

使用 zstd 压缩编解码器时 Spark 3.0.1 任务失败

我使用Spark 3.0.1和用户提供的Hadoop 3.2.0Scala 2.12.10Kubernetes 上运行。

读取压缩成一个文件拼花时一切正常活泼的,但是当我尝试读取压缩成一个文件拼花zstd以下错误下几项任务失败:

java.io.IOException: Decompression error: Version not supported
at com.github.luben.zstd.ZstdInputStream.readInternal(ZstdInputStream.java:164)
at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:120)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2781)
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2797)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3274)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:934)
at java.io.ObjectInputStream.(ObjectInputStream.java:396)
at org.apache.spark.MapOutputTracker$.deserializeObject$1(MapOutputTracker.scala:954)
at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:964)
at org.apache.spark.MapOutputTrackerWorker.$anonfun$getStatuses$2(MapOutputTracker.scala:856)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:851)
at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:808)
at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:128)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:185)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) …
Run Code Online (Sandbox Code Playgroud)

apache-spark zstd spark3

5
推荐指数
1
解决办法
599
查看次数

在 Spark 中将日期转换为 ISO 周日期

一列中有日期,如何创建包含ISO 周日期的列?

ISO 周日期由年份周数工作日组成。

  • 年份与使用函数获得的年份不同year
  • 周数是简单的部分 - 可以使用 获得weekofyear
  • weekday应该为星期一返回 1,为星期日返回 7,而 Spark 则dayofweek不能这样做。

示例数据框:

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
    ('1977-12-31',),
    ('1978-01-01',),
    ('1978-01-02',),
    ('1978-12-31',),
    ('1979-01-01',),
    ('1979-12-30',),
    ('1979-12-31',),
    ('1980-01-01',)],
    ['my_date']
).select(F.col('my_date').cast('date'))

df.show()
#+----------+
#|   my_date|
#+----------+
#|1977-12-31|
#|1978-01-01|
#|1978-01-02|
#|1978-12-31|
#|1979-01-01|
#|1979-12-30|
#|1979-12-31|
#|1980-01-01|
#+----------+
Run Code Online (Sandbox Code Playgroud)

期望的结果:

+----------+-------------+
|   my_date|iso_week_date|
+----------+-------------+
|1977-12-31|   1977-W52-6|
|1978-01-01|   1977-W52-7|
|1978-01-02|   1978-W01-1|
|1978-12-31| …
Run Code Online (Sandbox Code Playgroud)

date apache-spark apache-spark-sql pyspark spark3

4
推荐指数
1
解决办法
3779
查看次数

如何以加密格式保存spark数据集?

我将 Spark 数据集保存为本地计算机中的 parquet 文件。我想知道是否有任何方法可以使用某种加密算法来加密数据。我用来将数据保存为镶木地板文件的代码看起来像这样。

dataset.write().mode("overwrite").parquet(parquetFile);

我看到了类似的问题,但当我写入本地磁盘时,我的查询有所不同。

java encryption hadoop apache-spark spark3

1
推荐指数
1
解决办法
2006
查看次数