Koalas/pyspark 找不到数据源:delta

zyd*_*zyd 6 apache-spark pyspark databricks delta-lake spark-koalas

当我尝试在本地使用 koalas.DataFrame.to_delta() 将 Koalas DataFrame 直接写入增量表时,出现以下 Pyspark 异常:
java.lang.ClassNotFoundException: Failed to find data source: delta
编辑:忽略下面,直接调用 Pyspark 也会出现问题。

如果我将 Koalas DataFrame 转换为 Spark DataFrame 然后写入 delta,我似乎没有问题。是否存在 Koalas 不知道但 Pyspark 知道的底层库?看起来很奇怪,因为我认为在幕后使用相同的 Pyspark 模块...我应该注意到 Koalas to_delta() 方法似乎确实在 Databricks 上工作,这表明我的本地设置缺少与 Delta 相关的库。

失败的考拉代码:

kdf = ks.DataFrame({'eid': [1, 2, 3],
                        'contigName': ['chr1', 'chr2', 'chr3'],
                        'phen1': [0.123, 0.456, 0.789],
                        'phen2': [0.987, 0.654, 0.321]})
kdf.to_delta(path='tmp/test.delta', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

编辑:不让考拉 Spark 到 Delta 代码:

kdf = ks.DataFrame({'eid': [1, 2, 3],
                        'contigName': ['chr1', 'chr2', 'chr3'],
                        'phen1': [0.123, 0.456, 0.789],
                        'phen2': [0.987, 0.654, 0.321]})
kdf.to_spark().write.format('delta').mode('overwrite')
Run Code Online (Sandbox Code Playgroud)

另外,Koalas 和 Spark 保存到增量表的方式是否有任何差异需要注意?我有一个相当大的增量表,到目前为止已经使用 Koalas(在 Databricks 上)写入,但我可能会切换到 Spark.write 以使本地测试更容易。在我这样做之前,我想确保这两种方法的结果是相同的(我将做一些测试来确认这一点,只是好奇是否有人有关于切换现有增量表的写入策略的任何其他注释)。

编辑:好吧,我想 Pyspark 实际上也没有保存增量表,我忘记像哑巴一样将 .save() 添加到 Pyspark .write 调用中。所以我想我现在真正的问题是在本地运行 Pyspark 时如何包含 Delta 库/jar,特别是当我在 Pycharm 中运行单元测试时?

Ale*_*Ott 8

您只需要遵循文档即可。对于交互式 pyspark 可能是:

pyspark --packages io.delta:delta-core_2.12:1.0.0 \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Run Code Online (Sandbox Code Playgroud)

或使用代码(首先安装包pip install delta-spark):

from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
Run Code Online (Sandbox Code Playgroud)

请注意,不同版本的 Delta 对 Spark 版本有不同的要求 - 请检查您的 Spark 版本的兼容性表。