zer*_*323 57 python scala apache-spark apache-spark-sql pyspark
这个问题的目标是记录:
在PySpark中使用JDBC连接读取和写入数据所需的步骤
JDBC源和已知解决方案可能存在的问题
通过小的更改,这些方法应该与其他支持的语言一起使用,包括Scala和R.
zer*_*323 100
提交应用程序或启动shell时,请包含适用的JDBC驱动程序.您可以使用例如--packages:
bin/pyspark --packages group:name:version
Run Code Online (Sandbox Code Playgroud)
或合并driver-class-path和jars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
Run Code Online (Sandbox Code Playgroud)
PYSPARK_SUBMIT_ARGS在JVM实例启动或使用conf/spark-defaults.conf设置spark.jars.packages或spark.jars/ 之前,也可以使用环境变量设置这些属性spark.driver.extraClassPath.
选择所需的模式.Spark JDBC writer支持以下模式:
append:追加此内容:class:DataFrameto existing data.overwrite:覆盖现有数据.ignore:如果数据已存在,请静默忽略此操作.error(默认情况):如果数据已存在则抛出异常.
不支持 Upserts或其他细粒度修改
mode = ...
Run Code Online (Sandbox Code Playgroud)准备JDBC URI,例如:
# You can encode credentials in URI or pass
# separately using properties argument
# of jdbc method or options
url = "jdbc:postgresql://localhost/foobar"
Run Code Online (Sandbox Code Playgroud)(可选)创建JDBC参数的字典.
properties = {
"user": "foo",
"password": "bar"
}
Run Code Online (Sandbox Code Playgroud)
properties/ options也可用于设置支持的JDBC连接属性.
使用 DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
Run Code Online (Sandbox Code Playgroud)
保存数据(详见pyspark.sql.DataFrameWriter).
已知问题:
使用--packages(java.sql.SQLException: No suitable driver found for jdbc: ...)包含驱动程序时找不到合适的驱动程序
假设没有驱动程序版本不匹配来解决此问题,您可以将driver类添加到properties.例如:
properties = {
...
"driver": "org.postgresql.Driver"
}
Run Code Online (Sandbox Code Playgroud)使用df.write.format("jdbc").options(...).save()可能导致:
java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource不允许将create table作为select.
解决方案未知
在Pyspark 1.3中,您可以尝试直接调用Java方法:
df._jdf.insertIntoJDBC(url, "baz", True)
Run Code Online (Sandbox Code Playgroud)用途sqlContext.read.jdbc:
sqlContext.read.jdbc(url=url, table="baz", properties=properties)
Run Code Online (Sandbox Code Playgroud)
或者sqlContext.read.format("jdbc"):
(sqlContext.read.format("jdbc")
.options(url=url, dbtable="baz", **properties)
.load())
Run Code Online (Sandbox Code Playgroud)已知问题和陷阱:
Spark SQL支持使用JDBC源进行谓词下推,尽管并非所有谓词都可以下推.它也不会委托限制或聚合.可能的解决方法是使用有效的子查询替换dbtable/ tableargument.参见例如:
默认情况下,JDBC数据源使用单个执行程序线程按顺序加载数据.要确保分布式数据加载,您可以:
column(必须IntegeType)lowerBound,upperBound,numPartitions.predicates每个谓词分区一个.看到:
在分布式模式(具有分区列或谓词)中,每个执行程序在其自己的事务中操作.如果同时修改源数据库,则无法保证最终视图将保持一致.
Maven存储库(用于获取--packages所选版本所需的坐标和从compile-group:name:version替换相应字段的表单中的Gradle选项卡复制数据)或Maven Central Repository:
根据数据库的不同,可能存在专用源,在某些情况下首选:
| 归档时间: |
|
| 查看次数: |
59721 次 |
| 最近记录: |