小编Vin*_*nan的帖子

任务不可序列化Flink

我试图在flink中进行pagerank基本示例,稍加修改(仅在读取输入文件时,其他一切都是相同的)我得到错误,因为任务不可序列化,下面是输出错误的一部分

atorg.apache.flink.api.scala.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:179)at org.apache.flink.api.scala.ClosureCleaner $ .clean(ClosureCleaner.scala:171)

以下是我的代码

object hpdb {

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val maxIterations = 10000

    val DAMPENING_FACTOR: Double = 0.85

    val EPSILON: Double = 0.0001

    val outpath = "/home/vinoth/bigdata/assign10/pagerank.csv"

    val links = env.readCsvFile[Tuple2[Long,Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
                fieldDelimiter = "\t", includedFields = Array(1,4)).as('sourceId,'targetId).toDataSet[Link]//source and target

    val pages = env.readCsvFile[Tuple1[Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
      fieldDelimiter = "\t", includedFields = Array(1)).as('pageId).toDataSet[Id]//Pageid

    val noOfPages = pages.count()

    val pagesWithRanks = pages.map(p => Page(p.pageId, 1.0 / noOfPages))

    val adjacencyLists = links
      // initialize …
Run Code Online (Sandbox Code Playgroud)

scala apache-flink

4
推荐指数
1
解决办法
1499
查看次数

标签 统计

apache-flink ×1

scala ×1