我想知道加载增量表特定分区的最佳方法是什么?选项 2 是否在过滤之前加载所有表?
df = spark.read.format("delta").option('basePath','/mnt/raw/mytable/')\
.load('/mnt/raw/mytable/ingestdate=20210703')
Run Code Online (Sandbox Code Playgroud)
(这里需要basePath选项吗?)
df = spark.read.format("delta").load('/mnt/raw/mytable/')
df = df.filter(col('ingestdate')=='20210703')
Run Code Online (Sandbox Code Playgroud)
提前谢谢了 !
partitioning apache-spark pyspark azure-databricks delta-lake
我有一个简单的 Spark 作业,将数据流式传输到 Delta 表。该表非常小并且没有分区。
创建了许多小镶木地板文件。
按照文档(https://docs.delta.io/1.0.0/best-practices.html)中的建议,我添加了每天运行一次的压缩作业。
val path = "..."
val numFiles = 16
spark.read
.format("delta")
.load(path)
.repartition(numFiles)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.save(path)
Run Code Online (Sandbox Code Playgroud)
每次压缩作业运行时,流作业都会出现以下异常:
org.apache.spark.sql.delta.ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
Run Code Online (Sandbox Code Playgroud)
我尝试将以下配置参数添加到流作业中:
spark.databricks.delta.retryWriteConflict.enabled = true # would be false by default
spark.databricks.delta.retryWriteConflict.limit = 3 # optionally limit the maximum amout of retries
Run Code Online (Sandbox Code Playgroud)
这没有帮助。
知道如何解决这个问题吗?
当我尝试在本地使用 koalas.DataFrame.to_delta() 将 Koalas DataFrame 直接写入增量表时,出现以下 Pyspark 异常:
java.lang.ClassNotFoundException: Failed to find data source: delta
编辑:忽略下面,直接调用 Pyspark 也会出现问题。
如果我将 Koalas DataFrame 转换为 Spark DataFrame 然后写入 delta,我似乎没有问题。是否存在 Koalas 不知道但 Pyspark 知道的底层库?看起来很奇怪,因为我认为在幕后使用相同的 Pyspark 模块...我应该注意到 Koalas to_delta() 方法似乎确实在 Databricks 上工作,这表明我的本地设置缺少与 Delta 相关的库。
失败的考拉代码:
kdf = ks.DataFrame({'eid': [1, 2, 3],
'contigName': ['chr1', 'chr2', 'chr3'],
'phen1': [0.123, 0.456, 0.789],
'phen2': [0.987, 0.654, 0.321]})
kdf.to_delta(path='tmp/test.delta', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
编辑:不让考拉 Spark 到 Delta 代码:
kdf = ks.DataFrame({'eid': [1, 2, 3],
'contigName': ['chr1', 'chr2', 'chr3'],
'phen1': [0.123, 0.456, 0.789], …Run Code Online (Sandbox Code Playgroud) 我正在使用 DeltaLake API 使用下面的代码更新表中的行
DeltaTable.forPath(sparkSession, cleanDataPath)
.as("target")
.merge(df.as("source"), "target.desk_mirror_name = source.desk_mirror_name AND target.price = source.price AND target.valuationdate = source.valuationdate AND target.valuationversion = source.valuationversion")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute();
Run Code Online (Sandbox Code Playgroud)
这应该匹配源表和目标表之间的所有列,除了列valuationtag
合并之前,目标表如下所示
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| Sample|499.97| 2021-06-10| 210611170317|210611170317|
| Sample|967.93| 2021-06-10| 210611170317|210611170317|
| Sample| 500.0| 2021-06-10| 210611170317|210611170317|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)
源表(应更新目标表)如下
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| Sample|499.97| 2021-06-10| 210611170317| OFFICIAL|
| Sample| 500.0| 2021-06-10| 210611170317| OFFICIAL|
| Sample|967.93| 2021-06-10| 210611170317| OFFICIAL|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)
仅valuationtag更改为OFFICIAL。有了这个,更新后的表是
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+ …Run Code Online (Sandbox Code Playgroud) 我多次在 Databricks 中更新增量表时遇到问题,第一次覆盖架构失败,但第二次成功。我的问题的解决方案是简单地再次运行它,但此时我无法重现。如果再次发生这种情况,我会回来并发布确切的错误消息,但这本质上是架构不匹配错误。还有其他人遇到过类似的问题吗?
overwriteSchema = True
DF.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", overwriteSchema) \
.partitionBy(datefield) \
.saveAsTable(deltatable)
Run Code Online (Sandbox Code Playgroud) Databricks文档中的所有示例均采用 Scala 语言。无法从 PySpark 找到如何使用此触发器类型。是否有等效的 API 或解决方法?
DELTA不具有CREATE TABLE LIKE。它确实有CTAS。
我只想复制表的定义LOCATION,但还要指定.
例如,这不起作用:
CREATE TABLE IF NOT EXISTS NEW_CUSTOMER_FEED
AS SELECT * from NEW_CUSTOMER_FEED WHERE 1 = 0
LOCATION '/atRest/data'
Run Code Online (Sandbox Code Playgroud)
我缺少什么?
我是 Spark 和 Delta Lake 的新手。我正在增量表之上创建配置单元表。我有必要的罐子 delta-core-shaded- assembly_2.11-0.1.0.jar、hive-delta_2.11-0.1.0.jar;在蜂巢类路径中。设置以下属性。
SET hive.input.format=io.delta.hive.HiveInputFormat;
SET hive.tez.input.format=io.delta.hive.HiveInputFormat;
Run Code Online (Sandbox Code Playgroud)
但是在创建表时
CREATE EXTERNAL TABLE hive_table(col1 INT, col2 STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION '/path/to/my/delta/table'
得到以下异常:
FAILED:执行错误,从 org.apache.hadoop.hive.ql.exec.DDLTask 返回代码 1。org.apache.spark.network.util.JavaUtils.byteStringAs(Ljava/lang/String;Lorg/apache/spark/network/util/ByteUnit;)J
两个表的模式匹配。堆栈详细信息:Spark:2.4.4 Hive:1.2.1
任何帮助深表感谢。提前致谢。
Delta Lake 在哪里存储表元数据信息。我在我的独立机器上使用 Spark 2.6(不是 Databricks)。我的假设是,如果我重新启动 Spark,在 Delta Lake Spark 中创建的表将被删除(从 Jupyter Notebook 尝试)。但事实并非如此。
我有两个 pyspark 流工作:
streaming_job_a从 kafka 读取,将包含一列中的原始数据和另一列中的时间戳的数据帧写入 s3 中的位置,并使用位置A创建非托管增量表table_aAstreaming_job_b从增量表中读取table_a,将原始数据提取到单独的列中,写入到Bs3 中的位置,并创建非托管增量表table_b。如果我想更改这两个作业使用的位置和表名称,如何以保留数据、不会导致检查点问题并且花费最少时间的方式进行更改?必须保留这两个表,因为其他团队会读取这两个表。理想情况下,最终结果如下所示:
streaming_job_a从 kafka 读取,写入A_news3 中的位置并创建增量表table_a_newstreaming_job_b从 delta table 读取table_a_new,写入B_news3 中的位置,并创建 delta table table_b_new。我知道我可以从旧位置读取并写入新位置,如下所示:
incoming_df = spark.readStream.format("delta").table("table_a")
writer_df = (
incoming_df
.writeStream.format("delta")
.option("checkpointLocation", "A_new/_checkpoints")
.option("path", "A_new")
.trigger(once=True)
)
writer_df.start()
Run Code Online (Sandbox Code Playgroud)
然后创建新表:
spark.sql("create table table_a_new using delta location 'A_new'")
Run Code Online (Sandbox Code Playgroud)
然后执行类似的操作,但在这种方法中,我担心在迁移发生时丢失streaming_job_b写入位置的新数据。一般来说,我对 Spark Streaming 还很陌生,所以非常感谢任何建议!Astreaming_job_b
amazon-s3 apache-spark pyspark spark-structured-streaming delta-lake
delta-lake ×10
apache-spark ×5
databricks ×5
pyspark ×5
amazon-s3 ×1
hive ×1
merge ×1
parquet ×1
partitioning ×1
spark-koalas ×1