Spark Flume Streaming scala foreachpartition

Jar*_*ims 5 scala flume apache-spark spark-streaming

我正在尝试通过 Spark 流处理来自 Flume-avro 接收器的事件,并以使用 foreachRDD 的设计模式的方式执行此操作,但出于某种原因,代码不执行,其中显示“不起作用”。partion.size() 返回 1,但它甚至没有迭代 1 个分区。ps我是一个scala菜鸟。

  events.foreachRDD { rdd =>
if (rdd.take(1).size == 1) {
  System.out.println("**********************************WE GOT AN RDD")
  System.out.println("*******************************NUM PARTITIONS =" + rdd.partitions.size)
  val array = rdd.collect()
  array.foreach { x => 
    System.out.println("**************WORKS********************" + new String(x.event.getBody().array(),"UTF-8"))
  }
  rdd.foreachPartition { partitionItr =>
    //System.out.println("**********************************WE NEVER GET HERE " + partitionItr.size)
    //create db connection from pool
    //val connection = ConnectionPool.getConnection()
    partitionItr.foreach { item =>
      //write to db
      System.out.println("****************DOES NOT WORK******************" + new String(item.event.getBody().array(),"UTF-8"))
      //return connection to pool
      //ConnectionPool.returnConnection(connection)
    }
  }
  //rdd.count()
}else{
  System.out.println("**********************************WE GOT NOTHIN")
}
Run Code Online (Sandbox Code Playgroud)

}