如何使用JDBC源在(Py)Spark中写入和读取数据?

zer*_*323 57 python scala apache-spark apache-spark-sql pyspark

这个问题的目标是记录:

  • 在PySpark中使用JDBC连接读取和写入数据所需的步骤

  • JDBC源和已知解决方案可能存在的问题

通过小的更改,这些方法应该与其他支持的语言一起使用,包括Scala和R.

zer*_*323 100

写数据

  1. 提交应用程序或启动shell时,请包含适用的JDBC驱动程序.您可以使用例如--packages:

    bin/pyspark --packages group:name:version  
    
    Run Code Online (Sandbox Code Playgroud)

    或合并driver-class-pathjars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
    
    Run Code Online (Sandbox Code Playgroud)

    PYSPARK_SUBMIT_ARGS在JVM实例启动或使用conf/spark-defaults.conf设置spark.jars.packagesspark.jars/ 之前,也可以使用环境变量设置这些属性spark.driver.extraClassPath.

  2. 选择所需的模式.Spark JDBC writer支持以下模式:

    • append:追加此内容:class:DataFrameto existing data.
    • overwrite:覆盖现有数据.
    • ignore:如果数据已存在,请静默忽略此操作.
    • error (默认情况):如果数据已存在则抛出异常.

    不支持 Upserts或其他细粒度修改

    mode = ...
    
    Run Code Online (Sandbox Code Playgroud)
  3. 准备JDBC URI,例如:

    # You can encode credentials in URI or pass
    # separately using properties argument
    # of jdbc method or options
    
    url = "jdbc:postgresql://localhost/foobar"
    
    Run Code Online (Sandbox Code Playgroud)
  4. (可选)创建JDBC参数的字典.

    properties = {
        "user": "foo",
        "password": "bar"
    }
    
    Run Code Online (Sandbox Code Playgroud)

    properties/ options也可用于设置支持的JDBC连接属性.

  5. 使用 DataFrame.write.jdbc

    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    
    Run Code Online (Sandbox Code Playgroud)

    保存数据(详见pyspark.sql.DataFrameWriter).

已知问题:

  • 使用--packages(java.sql.SQLException: No suitable driver found for jdbc: ...)包含驱动程序时找不到合适的驱动程序

    假设没有驱动程序版本不匹配来解决此问题,您可以将driver类添加到properties.例如:

    properties = {
        ...
        "driver": "org.postgresql.Driver"
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 使用df.write.format("jdbc").options(...).save()可能导致:

    java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource不允许将create table作为select.

    解决方案未知

  • 在Pyspark 1.3中,您可以尝试直接调用Java方法:

    df._jdf.insertIntoJDBC(url, "baz", True)
    
    Run Code Online (Sandbox Code Playgroud)

读数据

  1. 按照写入数据的步骤1-4
  2. 用途sqlContext.read.jdbc:

    sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    
    Run Code Online (Sandbox Code Playgroud)

    或者sqlContext.read.format("jdbc"):

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())
    
    Run Code Online (Sandbox Code Playgroud)

已知问题和陷阱:

哪里可以找到合适的司机:

其他选择

根据数据库的不同,可能存在专用源,在某些情况下首选:


归档时间:

查看次数:

59721 次

最近记录:

7 年 前