我正在尝试使用解析日期,to_date()但出现以下异常。
SparkUpgradeException: 由于 Spark 3.0 升级,您可能会得到不同的结果:Fail to parse '12/1/2010 8:26' in the new parser。您可以将 spark.sql.legacy.timeParserPolicy 设置为 LEGACY 以恢复 Spark 3.0 之前的行为,或者设置为 CORRECTED 并将其视为无效的日期时间字符串。
例外情况表明我应该使用旧版时间解析器,首先我不知道如何将其设置为旧版。
这是我的实现
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/dd/yyyy"))
Run Code Online (Sandbox Code Playgroud)
我的日期采用以下格式
+--------------+
| InvoiceDate|
+--------------+
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
Run Code Online (Sandbox Code Playgroud) 我有大量的 json 文件,Spark 可以在 36 秒内读取这些文件,但 Spark 3.0 需要将近 33 分钟才能读取相同的文件。仔细分析,看起来 Spark 3.0 选择的 DAG 与 Spark 2.0 不同。有谁知道发生了什么?Spark 3.0.0 是否有任何配置问题?
火花2.4
scala> spark.time(spark.read.json("/data/20200528"))
Time taken: 19691 ms
res61: org.apache.spark.sql.DataFrame = [created: bigint, id: string ... 5 more fields]
scala> spark.time(res61.count())
Time taken: 7113 ms
res64: Long = 2605349
Run Code Online (Sandbox Code Playgroud)
火花3.0
scala> spark.time(spark.read.json("/data/20200528"))
20/06/29 08:06:53 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Time taken: 849652 ms …Run Code Online (Sandbox Code Playgroud) 在我的 Spark 工作中,我尝试覆盖结构化流的每个微批次中的一个表
batchDF.write.mode(SaveMode.Overwrite).saveAsTable("mytable")
Run Code Online (Sandbox Code Playgroud)
它产生了以下错误。
Can not create the managed table('`mytable`'). The associated location('file:/home/ec2-user/environment/spark/spark-local/spark-warehouse/mytable') already exists.;
Run Code Online (Sandbox Code Playgroud)
我知道在 Spark 2.xx 中,解决这个问题的方法是添加以下选项。
spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")
Run Code Online (Sandbox Code Playgroud)
它在 Spark 2.xx 中运行良好。不过,这个选项在 Spark 3.0.0 中被删除了。那么,在Spark 3.0.0中我们应该如何解决这个问题呢?
谢谢!
我使用Spark 3.0.1和用户提供的Hadoop 3.2.0和Scala 2.12.10在Kubernetes 上运行。
读取压缩成一个文件拼花时一切正常活泼的,但是当我尝试读取压缩成一个文件拼花zstd以下错误下几项任务失败:
java.io.IOException: Decompression error: Version not supported
at com.github.luben.zstd.ZstdInputStream.readInternal(ZstdInputStream.java:164)
at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:120)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2781)
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2797)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3274)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:934)
at java.io.ObjectInputStream.(ObjectInputStream.java:396)
at org.apache.spark.MapOutputTracker$.deserializeObject$1(MapOutputTracker.scala:954)
at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:964)
at org.apache.spark.MapOutputTrackerWorker.$anonfun$getStatuses$2(MapOutputTracker.scala:856)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:851)
at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:808)
at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:128)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:185)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) …Run Code Online (Sandbox Code Playgroud) 一列中有日期,如何创建包含ISO 周日期的列?
ISO 周日期由年份、周数和工作日组成。
year。weekofyear。dayofweek不能这样做。示例数据框:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
('1977-12-31',),
('1978-01-01',),
('1978-01-02',),
('1978-12-31',),
('1979-01-01',),
('1979-12-30',),
('1979-12-31',),
('1980-01-01',)],
['my_date']
).select(F.col('my_date').cast('date'))
df.show()
#+----------+
#| my_date|
#+----------+
#|1977-12-31|
#|1978-01-01|
#|1978-01-02|
#|1978-12-31|
#|1979-01-01|
#|1979-12-30|
#|1979-12-31|
#|1980-01-01|
#+----------+
Run Code Online (Sandbox Code Playgroud)
期望的结果:
+----------+-------------+
| my_date|iso_week_date|
+----------+-------------+
|1977-12-31| 1977-W52-6|
|1978-01-01| 1977-W52-7|
|1978-01-02| 1978-W01-1|
|1978-12-31| …Run Code Online (Sandbox Code Playgroud) 我将 Spark 数据集保存为本地计算机中的 parquet 文件。我想知道是否有任何方法可以使用某种加密算法来加密数据。我用来将数据保存为镶木地板文件的代码看起来像这样。
dataset.write().mode("overwrite").parquet(parquetFile);
我看到了类似的问题,但当我写入本地磁盘时,我的查询有所不同。
apache-spark ×6
spark3 ×6
pyspark ×2
date ×1
encryption ×1
hadoop ×1
java ×1
java-11 ×1
scala ×1
zstd ×1