标签: spark-jdbc

Spark:read.jdbc(..numPartitions..) 和 repartition(..numPartitions..) 中 numPartitions 的区别

numPartitions对以下方法中参数的行为感到困惑:

  1. DataFrameReader.jdbc
  2. Dataset.repartition

官方的文档DataFrameReader.jdbc的发言权就下列numPartitions参数

numPartitions : 分区数。这与lowerBound(包含)、upperBound(不包含)一起形成用于生成的WHERE 子句表达式的分区步幅,用于均匀地拆分列columnName。

官方的文档Dataset.repartition发言权

返回一个具有精确numPartitions分区的新数据集。


我目前的理解:

  1. 方法中的numPartition参数DataFrameReader.jdbc控制从数据库读取数据的并行度
  2. numPartition参数Dataset.repartition控制输出文件的数量时,这将生成DataFrame将被写入到磁盘

我的问题:

  1. 如果我DataFrame通过读取DataFrameReader.jdbc然后将其写入磁盘(不调用repartition方法),那么输出中的文件是否仍然与我DataFrame在调用后将其写到磁盘repartition上的文件一样多?
  2. 如果以上问题的答案是:
    • 是:那么repartitionDataFrame使用DataFrameReader.jdbc方法(带numPartitions参数)读取的方法上调用方法是多余的吗?
    • 否:那么请纠正我的理解错误。同样在这种情况下numPartitionsDataFrameReader.jdbc方法的参数不应该被称为“并行”之东西吗?

dataframe apache-spark spark-dataframe spark-jdbc

7
推荐指数
1
解决办法
1万
查看次数

如何在 pyspark 中使用 azure-sqldb-spark 连接器

我想每天使用 PySpark 将大约 10 GB 的数据写入 Azure SQL 服务器数据库。目前使用 JDBC 驱动程序,这需要几个小时一个一个地插入语句。

我打算使用 azure-sqldb-spark 连接器,它声称使用批量插入来增强写入。

我浏览了官方文档:https : //github.com/Azure/azure-sqldb-spark。该库是用 Scala 编写的,基本上需要使用 2 个 Scala 类:

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "databaseName"      -> "MyDatabase",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig)
Run Code Online (Sandbox Code Playgroud)

可以像这样在pyspark中使用吗(使用sc._jvm):

Config = sc._jvm.com.microsoft.azure.sqldb.spark.config.Config
connect= sc._jvm.com.microsoft.azure.sqldb.spark.connect._

//all config

df.connect.bulkCopyToSqlDB(bulkCopyConfig)
Run Code Online (Sandbox Code Playgroud)

我不是 Python 专家。任何人都可以用完整的代码段帮助我完成这项工作。

azure apache-spark pyspark spark-jdbc

7
推荐指数
1
解决办法
4077
查看次数

如何在可移植 JDBC 中获取数组类型的基类型

如果您有一个表,其中的列类型为 SQL ARRAY,那么如何找到数组类型的基本类型,即数组类型的各个元素的类型?

  • 如何在与供应商无关的纯 JDBC 中做到这一点?
  • 在不获取和检查实际行数据的情况下如何做到这一点?同样:如果表是空的怎么办?

这里也提出了类似的问题:

然而,我要求通过 JDBC API 本身提供一种与供应商无关的方式。我问:如何使用与供应商无关的纯 JDBC 来解决这个问题?这个用例似乎是 JDBC 的核心用例,我真的很惊讶我在 JDBC 中找不到解决方案。

我花了几个小时反复阅读 JDBC API javadocs,又花了几个小时在互联网上搜索,令我感到非常惊讶的是,似乎没有通过 JDBC API 执行此操作的正确方法。它应该通过 DatabaseMetaData 或 ResultSetMetaData 就在那里,但显然不是。

以下是我发现的不足的解决方法和替代方案。

  • 获取一些行,直到获得具有该列的实际值的行,获取列值,转换为 java.sql.Array,然后调用 getBaseType。
  • 对于 postgres,假设 SQL ARRAY 类型名称编码为 ("_" + baseTypeName)。
  • 对于 Oracle,请使用 Oracle 特定扩展来获取答案。
  • 某些数据库有一个特殊的“element_types”视图,其中包含当前表等使用的每个 SQL ARRAY 类型的一行,并且该行包含基本类型和基本类型名称。

我的背景是,我想在我公司产品的 Spark in Cloud 中使用供应商提供的 JDBC 连接器,并且元数据发现变得很重要。我还在研究自己为其他还没有 JDBC 驱动程序或 Spark 连接器的数据源编写 JDBC 连接器的可行性。元数据发现非常重要,这样人们才能正确定义 Spark InternalRow 和 Spark-JDBC 数据获取器。目前,Spark-JDBC 对 SQL ARRAY 和 SQL STRUCT 的支持非常有限,但我设法用一两天的编码来提供缺失的位,但在此过程中我遇到了这个阻碍我的问题。如果我可以控制 JDBC 驱动程序实现,那么我可以使用拼凑(即在类型名称中编码类型信息,并在 Spark JdbcDialect …

sql jdbc apache-spark apache-spark-sql spark-jdbc

7
推荐指数
0
解决办法
258
查看次数

Spark-jdbc 中的准备语句

我正在尝试使用具有指定偏移量的 Spark jdbc 从 MSSQL 数据库读取数据。因此,应该仅在指定的时间戳(即该偏移量)之后加载数据。我尝试通过在 jdbc 配置中提供查询来实现它,但是,我没有找到使用参数化值创建准备好的语句的可能性。在这种情况下,我想参数化一个偏移量,该偏移量在每次应用程序启动后都会改变。如何使用 jdbc 选项来实现它?

所有数据库配置都位于 application.conf 文件中。这是我从数据库读取的方式:

def jdbcOptions(query: String) = Map[String,String](
    "driver" -> config.getString("sqlserver.db.driver"),
    "url" -> config.getString("sqlserver.db.url"),
    "dbtable" -> s"(select * from TestAllData where update_database_time >= '2019-03-19 12:30:00.003') as subq,
    "user" -> config.getString("sqlserver.db.user"),
    "password" -> config.getString("sqlserver.db.password"),
    "customSchema" -> config.getString("sqlserver.db.custom_schema")
  )

    val testDataDF = sparkSession
      .read
      .format("jdbc")
      .options(jdbcOptions())
      .load()
Run Code Online (Sandbox Code Playgroud)

相反,查询应该看起来几乎像这样:

s"(select * from TestAllData where update_database_time >= $tmstp) as subq
Run Code Online (Sandbox Code Playgroud)

sql-server apache-spark spark-jdbc

5
推荐指数
1
解决办法
5057
查看次数

Pseudocolumn in Spark JDBC

I am using a query to fetch data from MYSQL as follows:

var df = spark.read.format("jdbc")
         .option("url", "jdbc:mysql://10.0.0.192:3306/retail_db")
         .option("driver" ,"com.mysql.jdbc.Driver")
         .option("user", "retail_dba")
         .option("password", "cloudera")
         .option("dbtable", "orders")
         .option("partitionColumn", "order_id")
         .option("lowerBound", "1")
         .option("upperBound", "68883")
         .option("numPartitions", "4")
         .load() 
Run Code Online (Sandbox Code Playgroud)

Question is, can I use a pseudo column (like ROWNUM in Oracle or RRN(employeeno) in DB2) with option where I specify the partitionColumn ?

If not, can we specify a partition column which is not a primary key ?

apache-spark apache-spark-sql spark-jdbc

3
推荐指数
1
解决办法
1403
查看次数