我正在尝试在 Scala 中 reduceByKeys,是否有任何方法可以根据 Scala 中的键来减少值。[我知道我们可以通过 spark 中的 reduceByKey 方法来做,但是我们如何在 Scala 中做同样的事情?]
输入数据是:
val File = Source.fromFile("C:/Users/svk12/git/data/retail_db/order_items/part-00000")
.getLines()
.toList
val map = File.map(x => x.split(","))
.map(x => (x(1),x(4)))
map.take(10).foreach(println)
Run Code Online (Sandbox Code Playgroud)
在上述步骤之后,我得到的结果为:
(2,250.0)
(2,129.99)
(4,49.98)
(4,299.95)
(4,150.0)
(4,199.92)
(5,299.98)
(5,299.95)
Run Code Online (Sandbox Code Playgroud)
预期结果 :
(2,379.99)
(5,499.93)
.......
Run Code Online (Sandbox Code Playgroud) 我正在尝试创建一个 Spark 应用程序,它对于创建、读取、写入和更新 MySQL 数据很有用。那么,有没有办法使用Spark创建MySQL表呢?
下面我有一个 Scala-JDBC 代码,它在 MySQL 数据库中创建一个表。我怎样才能通过 Spark 做到这一点?
package SparkMysqlJdbcConnectivity
import org.apache.spark.sql.SparkSession
import java.util.Properties
import java.lang.Class
import java.sql.Connection
import java.sql.DriverManager
object MysqlSparkJdbcProgram {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("MysqlJDBC Connections")
.master("local[*]")
.getOrCreate()
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://localhost:3306/world"
val operationtype = "create table"
val tablename = "country"
val tablename2 = "state"
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")
val jdbcDf = spark.read.jdbc(url, s"${tablename}", connectionProperties)
operationtype.trim() match { …Run Code Online (Sandbox Code Playgroud)