Spark Scala从rdd.foreachPartition获取数据

cod*_*ure 7 scala apache-spark scalikejdbc spark-streaming

我有一些像这样的代码:

      println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
      val lastRevs = distinctFileGidsRDD.
        foreachPartition(iter => {
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          while(iter.hasNext) {
            val item = iter.next()
            //println(item(0))
            println("String: "+item(0).toString())
            val jsonStr = DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
                map { resultSet => resultSet.string(1) }.single.apply()
            }
            println("\nJSON: "+jsonStr)
          }
        })
      println("\nEND Last Revs Class: "+ lastRevs.getClass)
Run Code Online (Sandbox Code Playgroud)

代码输出(带有大量编辑)类似于:

BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
...
JSON: None()
END Last Revs Class: void
Run Code Online (Sandbox Code Playgroud)

问题1:如何将lastRevs值设置为有用的格式,如JSON字符串/ null或像Some/None这样的选项?

问题2:我的偏好:是否有另一种方法可以获得类似RDD格式的分区数据(而不是迭代器格式)?

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  } 
}
Run Code Online (Sandbox Code Playgroud)

来自http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

问题3:获取数据的方法是我使用的是理智的方法(假设我遵循上面的链接)?(暂时不考虑这是一个scalikejdbc系统JDBC.这将是除了这个原型以外的某种类型的关键值存储.)

maa*_*asg 5

要创建使用执行程序本地资源(例如DB或网络连接)的转换,您应该使用rdd.mapPartitions.它允许在本地将一些代码初始化为执行程序,并使用这些本地资源来处理分区中的数据.

代码应如下所示:

 val lastRevs = distinctFileGidsRDD.
        mapPartitions{iter => 
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          iter.map{ element => 
            DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar"
              .map { resultSet => resultSet.string(1) }.single.apply()
            }
          }
        }
Run Code Online (Sandbox Code Playgroud)

  • @lisak不,`foreachPartition`和`mapPartitions`都可以让你在执行程序上运行代码.区别在于`foreachPartition'只做副作用(比如写入db),而`mapPartitions`返回一个值.这个问题的关键是"如何获取数据"因此`mapPartitions`是要走的路. (3认同)