虽然通过在星火JDBC连接获取来自SQL Server的数据,我发现我可以设置一些并行的参数,如partitionColumn,lowerBound,upperBound,和numPartitions.我已经通过spark文档,但无法理解它.
谁能解释一下这些参数的含义?
我在集群模式下运行spark并通过JDBC从RDBMS读取数据.
根据Spark 文档,这些分区参数描述了在从多个worker并行读取时如何对表进行分区:
partitionColumnlowerBoundupperBoundnumPartitions这些是可选参数.
如果我不指定这些,会发生什么:
我numPartitions对以下方法中参数的行为感到困惑:
DataFrameReader.jdbcDataset.repartition在官方的文档中DataFrameReader.jdbc的发言权就下列numPartitions参数
numPartitions : 分区数。这与lowerBound(包含)、upperBound(不包含)一起形成用于生成的WHERE 子句表达式的分区步幅,用于均匀地拆分列columnName。
而官方的文档的Dataset.repartition发言权
返回一个具有精确
numPartitions分区的新数据集。
我目前的理解:
numPartition参数DataFrameReader.jdbc控制从数据库读取数据的并行度numPartition参数Dataset.repartition控制输出文件的数量时,这将生成DataFrame将被写入到磁盘我的问题:
DataFrame通过读取DataFrameReader.jdbc然后将其写入磁盘(不调用repartition方法),那么输出中的文件是否仍然与我DataFrame在调用后将其写到磁盘repartition上的文件一样多?repartition在DataFrame使用DataFrameReader.jdbc方法(带numPartitions参数)读取的方法上调用方法是多余的吗?numPartitions,DataFrameReader.jdbc方法的参数不应该被称为“并行”之类的东西吗?我想询问哪个配置选项在spark中具有优先权?它是配置文件还是我们在运行spark-submit shell时手动指定的选项?如果我的配置文件中有不同的执行程序内存选项,并且在运行spark-submit shell时指定了不同的值,该怎么办?
In my application I compare two different Datasets(i.e source table from Hive and Destination from RDBMS) for duplications and mis-matches, it works fine with smaller dataset but when I try to compare data more that 1GB (source alone) it hangs and throws TIMEOUT ERROR, I tried .config("spark.network.timeout", "600s") even after increasing the network timeout it throwing java.lang.OutOfMemoryError: GC overhead limit exceeded.
val spark = SparkSession.builder().master("local")
.appName("spark remote")
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false")
.config("javax.jdo.option.ConnectionUserName", "hiveroot")
.config("javax.jdo.option.ConnectionPassword", "hivepassword")
.config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
.config("hive.metastore.uris", "thrift://192.168.175.160:9083")
.enableHiveSupport() …Run Code Online (Sandbox Code Playgroud)