Ste*_*n S 2 java oracle dataframe apache-spark
我对 Spark 非常陌生,我有一个查询可以从两个 Oracle 表中获取数据。这些表必须由一个字段连接,它可以很好地与下面的代码一起使用。但是,我需要像在 Oracle“where”子句中一样应用过滤器。例如,带上年龄在 25 到 50 岁之间的员工。我还必须应用 GroupBy 过滤器并使用 OrderBy 对最终结果进行排序。问题是唯一正确执行的操作是从表中检索所有数据以及它们之间的连接。其余的过滤器根本没有应用,我不知道为什么。你能帮我解决这个问题吗?我确定我遗漏了一些东西,因为没有得到编译错误。数据加载正常,但“where”子句似乎对数据没有任何影响,尽管有些员工的年龄在 25 到 50 岁之间。
public static JavaRDD<Row> getResultsFromQuery(String connectionUrl) {
JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf()
.setAppName("SparkJdbcDs").setMaster("local"));
SQLContext sqlContext = new SQLContext(sparkContext);
Map<String, String> options = new HashMap<>();
options.put("driver", "oracle.jdbc.OracleDriver");
options.put("url", connectionUrl);
options.put("dbtable", "EMPLOYEE");
DataFrameReader dataFrameReader = sqlContext.read().format("jdbc")
.options(options);
DataFrame dataFrameFirstTable = dataFrameReader.load();
options.put("dbtable", "DEPARTMENT");
dataFrameReader = sqlContext.read().format("jdbc").options(options);
DataFrame dataFrameSecondTable = dataFrameReader.load();
//JOIN. IT WORKS JUST FINE!!!
DataFrame resultingDataFrame = dataFrameFirstTable.join(dataFrameSecondTable,
"DEPARTMENTID");
//FILTERS. THEY DO NOT THROW ERROR, BUT ARE NOT APPLIED. RESULTS ARE ALWAYS THE SAME, WITHOUT FILTERS
resultingDataFrame.where(resultingDataFrame.col("AGE").geq(25));
resultingDataFrame.where(resultingDataFrame.col("AGE").leq(50));
JavaRDD<Row> resultFromQuery = resultingDataFrame.toJavaRDD();
//HERE I CONFIRM THAT THE NUMBER OF ROWS GOTTEN IS ALWAYS THE SAME, SO THE FILTERS DO NOT WORK.
System.out.println("Number of rows "+resultFromQuery.count());
return resultFromQuery;
Run Code Online (Sandbox Code Playgroud)
}
where 返回一个新的数据帧并且不会改变现有的数据帧,因此您需要存储输出:
DataFrame greaterThan25 = resultingDataFrame.where(resultingDataFrame.col("AGE").geq(25));
DataFrame lessThanGreaterThan = greaterThan25.where(resultingDataFrame.col("AGE").leq(50));
JavaRDD<Row> resultFromQuery = lessThanGreaterThan.toJavaRDD();
Run Code Online (Sandbox Code Playgroud)
或者你可以把它链接起来:
DataFrame resultingDataFrame = dataFrameFirstTable.join(dataFrameSecondTable, "DEPARTMENTID")
.where(resultingDataFrame.col("AGE").geq(25))
.where(resultingDataFrame.col("AGE").leq(50));
Run Code Online (Sandbox Code Playgroud)