如何在Parallel中的Spark中从DB读取数据

Sau*_*rma 4 apache-spark apache-spark-sql spark-dataframe

我需要使用Spark SQL从DB2数据库读取数据(因为不存在Sqoop)

我知道这个功能,它将通过打开多个连接以并行方式读取数据

jdbc(url: String, table: String, columnName: String, lowerBound: Long,upperBound: Long, numPartitions: Int, connectionProperties: Properties)

我的问题是我没有像这样的增量列。我还需要通过Query读取数据,因为我的表很大。是否有人知道通过API读取数据的方式,还是我必须自己创建一些内容

Ale*_*lex 5

您不需要标识列并行读取,并且table变量仅指定源。注册表后,您可以使用WHERE子句使用Spark SQL查询限制从表中读取的数据。如果这不是一个选项,你可以使用一个视图代替,或在此描述,你也可以使用任意的子查询为您的表输入。

val dataframe = sqlContext.read.format("jdbc").option("url", "jdbc:db2://localhost/sparksql").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "table").option("user", "root").option("password", "root").load()
dataframe.registerTempTable("table")
dataframe.sqlContext.sql("select * from table where dummy_flag=1").collect.foreach(println)
Run Code Online (Sandbox Code Playgroud)


小智 5

Saurabh,为了使用标准的Spark JDBC数据源支持并行读取,您确实需要按预期的方式使用numPartitions选项。

但是您需要给Spark一些提示,以将读取的SQL语句拆分为多个并行的语句。因此,您需要某种整数分区列,其中具有确定的max和min值。

如果您的DB2系统是MPP分区,则已经存在一个隐式分区,您实际上可以利用这一事实并并行读取每个DB2数据库分区:

var df = spark.read.
format("jdbc").
option("url", "jdbc:db2://<DB2 server>:<DB2 port>/<dbname>").
option("user", "<username>").
option("password", "<password>").
option("dbtable", "<your table>").
option("partitionColumn", "DBPARTITIONNUM(<a column name>)").
option("lowerBound", "<lowest partition number>").
option("upperBound", "<largest partition number>").
option("numPartitions", "<number of partitions>").
load()
Run Code Online (Sandbox Code Playgroud)

如您所见,DBPARTITIONNUM()函数是此处的分区键。

万一您不知道DB2 MPP系统的分区,可以通过以下方法使用SQL找出它:

SELECT min(member_number), max(member_number), count(member_number) 
FROM TABLE(SYSPROC.DB_MEMBERS())
Run Code Online (Sandbox Code Playgroud)

如果使用多个分区组,并且不同的表可以分布在不同的分区集上,则可以使用此SQL找出每个表的分区列表:

SELECT t2.DBPARTITIONNUM, t3.HOST_NAME
 FROM SYSCAT.TABLESPACES as t1,  SYSCAT.DBPARTITIONGROUPDEF as t2,
      SYSCAT.TABLES t4, TABLE(SYSPROC.DB_MEMBERS()) as t3 
 WHERE t1.TBSPACEID = t4.TBSPACEID AND
       t4.TABSCHEMA='<myschema>' AND
       t4.TABNAME='<mytab>' AND
       t1.DBPGNAME = t2.DBPGNAME AND
       t2.DBPARTITIONNUM = t3.PARTITION_NUMBER;
Run Code Online (Sandbox Code Playgroud)