小编svk*_*994的帖子

如何在“Scala”中按键减少 [Not In Spark]

我正在尝试在 Scala 中 reduceByKeys,是否有任何方法可以根据 Scala 中的键来减少值。[我知道我们可以通过 spark 中的 reduceByKey 方法来做,但是我们如何在 Scala 中做同样的事情?]

输入数据是:

val File = Source.fromFile("C:/Users/svk12/git/data/retail_db/order_items/part-00000")
                 .getLines()
                 .toList

 val map = File.map(x => x.split(","))
               .map(x => (x(1),x(4)))

  map.take(10).foreach(println)
Run Code Online (Sandbox Code Playgroud)

在上述步骤之后,我得到的结果为:

(2,250.0)
(2,129.99)
(4,49.98)
(4,299.95)
(4,150.0)
(4,199.92)
(5,299.98)
(5,299.95)
Run Code Online (Sandbox Code Playgroud)

预期结果 :

(2,379.99)
(5,499.93)
.......
Run Code Online (Sandbox Code Playgroud)

scala higher-order-functions

5
推荐指数
1
解决办法
1052
查看次数

如何使用apache Spark在mysql数据库中创建表

我正在尝试创建一个 Spark 应用程序,它对于创建、读取、写入和更新 MySQL 数据很有用。那么,有没有办法使用Spark创建MySQL表呢?

下面我有一个 Scala-JDBC 代码,它在 MySQL 数据库中创建一个表。我怎样才能通过 Spark 做到这一点?

package SparkMysqlJdbcConnectivity

import org.apache.spark.sql.SparkSession
import java.util.Properties
import java.lang.Class
import java.sql.Connection
import java.sql.DriverManager

object MysqlSparkJdbcProgram {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("MysqlJDBC Connections")
      .master("local[*]")
      .getOrCreate()

    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/world"
    val operationtype = "create table"
    val tablename = "country"
    val tablename2 = "state"

    val connectionProperties = new Properties()

    connectionProperties.put("user", "root")
    connectionProperties.put("password", "root")

    val jdbcDf = spark.read.jdbc(url, s"${tablename}", connectionProperties)

    operationtype.trim() match { …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

2
推荐指数
1
解决办法
5983
查看次数