小编cod*_*ure的帖子

工作表中使用IntelliJ的重复Spark上下文

我在IntelliJ中有以下工作表:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
  @transient  private var instance: SQLContext = _
  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

val conf = new SparkConf().
  setAppName("Scala Wooksheet").
  setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("/Users/someuser/some.json")
df.show
Run Code Online (Sandbox Code Playgroud)

此代码在REPL中工作,但似乎只是第一次运行(带有其他一些错误).每次后续,错误是:

16/04/13 11:04:57 WARN SparkContext: Another SparkContext is being constructed (or threw an exception …
Run Code Online (Sandbox Code Playgroud)

scala intellij-idea apache-spark apache-spark-sql

10
推荐指数
2
解决办法
1556
查看次数

Spark Scala从rdd.foreachPartition获取数据

我有一些像这样的代码:

      println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
      val lastRevs = distinctFileGidsRDD.
        foreachPartition(iter => {
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          while(iter.hasNext) {
            val item = iter.next()
            //println(item(0))
            println("String: "+item(0).toString())
            val jsonStr = DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
                map { resultSet => resultSet.string(1) }.single.apply()
            }
            println("\nJSON: "+jsonStr)
          }
        })
      println("\nEND Last Revs Class: "+ lastRevs.getClass)
Run Code Online (Sandbox Code Playgroud)

代码输出(带有大量编辑)类似于:

BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
...
JSON: None() …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark scalikejdbc spark-streaming

7
推荐指数
1
解决办法
1万
查看次数

使用Scala在Spark DataFrame中重用JSON中的Schema

我有一些像这样的JSON数据:

{"gid":"111","createHour":"2014-10-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:40:37.0"},{"revId":"4","modDate":"2014-11-20 01:40:40.0"}],"comments":[],"replies":[]}
{"gid":"222","createHour":"2014-12-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:39:31.0"},{"revId":"4","modDate":"2014-11-20 01:39:34.0"}],"comments":[],"replies":[]}
{"gid":"333","createHour":"2015-01-21 00:00:00.0","revisions":[{"revId":"25","modDate":"2014-11-21 00:34:53.0"},{"revId":"110","modDate":"2014-11-21 00:47:10.0"}],"comments":[{"comId":"4432","content":"How are you?"}],"replies":[{"repId":"4441","content":"I am good."}]}
{"gid":"444","createHour":"2015-09-20 23:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 23:23:47.0"}],"comments":[],"replies":[]}
{"gid":"555","createHour":"2016-01-21 01:00:00.0","revisions":[{"revId":"135","modDate":"2014-11-21 01:01:58.0"}],"comments":[],"replies":[]}
{"gid":"666","createHour":"2016-04-23 19:00:00.0","revisions":[{"revId":"136","modDate":"2014-11-23 19:50:51.0"}],"comments":[],"replies":[]}
Run Code Online (Sandbox Code Playgroud)

我可以阅读:

val df = sqlContext.read.json("./data/full.json")
Run Code Online (Sandbox Code Playgroud)

我可以打印架构 df.printSchema

root
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comId: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = …
Run Code Online (Sandbox Code Playgroud)

json scala apache-spark apache-spark-sql

6
推荐指数
2
解决办法
8977
查看次数