Jar*_*ims 5 scala flume apache-spark spark-streaming
我正在尝试通过 Spark 流处理来自 Flume-avro 接收器的事件,并以使用 foreachRDD 的设计模式的方式执行此操作,但出于某种原因,代码不执行,其中显示“不起作用”。partion.size() 返回 1,但它甚至没有迭代 1 个分区。ps我是一个scala菜鸟。
events.foreachRDD { rdd =>
if (rdd.take(1).size == 1) {
System.out.println("**********************************WE GOT AN RDD")
System.out.println("*******************************NUM PARTITIONS =" + rdd.partitions.size)
val array = rdd.collect()
array.foreach { x =>
System.out.println("**************WORKS********************" + new String(x.event.getBody().array(),"UTF-8"))
}
rdd.foreachPartition { partitionItr =>
//System.out.println("**********************************WE NEVER GET HERE " + partitionItr.size)
//create db connection from pool
//val connection = ConnectionPool.getConnection()
partitionItr.foreach { item =>
//write to db
System.out.println("****************DOES NOT WORK******************" + new String(item.event.getBody().array(),"UTF-8"))
//return connection to pool
//ConnectionPool.returnConnection(connection)
}
}
//rdd.count()
}else{
System.out.println("**********************************WE GOT NOTHIN")
}
Run Code Online (Sandbox Code Playgroud)
}
归档时间: |
|
查看次数: |
472 次 |
最近记录: |