Mar*_*ria 1 java apache-spark spark-structured-streaming
如何在 Spark Java 中使用以下函数?查遍了互联网但找不到合适的例子。
public void foreachPartition(scala.Function1<scala.collection.Iterator<T>,scala.runtime.BoxedUnit> f)
Run Code Online (Sandbox Code Playgroud)
我唯一知道的是它对流程有好处batch of data,所谓的BoxedUnit。
如何获取或batch ID批量处理数据?BoxedUnitdataset
谁能告诉我如何实现这个方法?
我认为您对现状有错误的印象BoxedUnit,因此坚持使用 Java 中的 Scala 接口,由于 Scala 中暴露给 Java 的大量隐藏复杂性,该接口过于复杂。scala.Function1<scala.collection.Iterator<T>, scala.runtime.BoxedUnit>是 - 一个 Scala 函数的实现(Iterator[T]) => Unit,它接受Iterator[T]并返回Unit类型。UnitScala 中的 相当于 Java 中的void. BoxedUnit是装箱版本Unit- 它是一个堆对象,在其成员中保存单例单元值UNIT,并且是 Scala 程序中几乎从未出现的实现细节。如果数据集是 a DataFrame,那么您T需要在对象org.apache.spark.sql.Row集合上处理 Scala 迭代器Row。
要定义 Java 中的某些内容scala.Function1<scala.collection.Iterator<Row>, scala.runtime.BoxedUnit>,您需要创建 的实例AbstractFunction1<scala.collection.Iterator<Row>, scala.runtime.BoxedUnit>并重写其apply()必须返回 的方法BoxedUnit.UNIT。您还需要使其可序列化,因此您通常声明自己的类继承AbstractFunction1并实现Serializable. 您还可以通过公开一个不同的、更 Java 友好的抽象方法来对其进行 Java 化,以便稍后覆盖:
import org.apache.spark.sql.Row;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.collection.JavaConverters;
import java.util.Iterator;
class MyPartitionFunction<T> extends AbstractFunction1<scala.collection.Iterator<T>, BoxedUnit>
implements Serializable {
@Override
public BoxedUnit apply(scala.collection.Iterator<T> iterator) {
call(JavaConverters.asJavaIteratorConverter(iterator).asJava());
return BoxedUnit.UNIT;
}
public abstract void call(Iterator<T> iterator);
}
df.foreachPartition(new MyPartitionFunction<Row>() {
@Override
public void call(Iterator<Row> iterator) {
for (Row row : iterator) {
// do something with the row
}
}
});
Run Code Online (Sandbox Code Playgroud)
这是相当大的实现复杂性,这就是为什么有 Java 特定版本来代替ForeachPartitionFunction<T>,上面的代码变成:
import org.apache.spark.sql.Row;
import org.apache.spark.api.java.function.ForeachPartitionFunction;
import java.util.Iterator;
df.foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> iterator) throws Exception {
for (Row row : iterator) {
// do something with the row
}
}
}
Run Code Online (Sandbox Code Playgroud)
其功能与 Scala 接口提供的功能完全相同,只是 Apache Spark 为您进行迭代器转换,并且它还为您提供了一个友好的 Java 类,不需要您导入和实现 Scala 类型。
也就是说,我认为您对 Spark 的工作原理有一些误解。您不需要用来foreachPartition批量处理流数据。Spark 的流引擎会自动为您完成此操作。您编写指定转换和聚合的流查询,然后随着新数据从流中到达而逐步应用这些转换和聚合。
foreachPartition是为某些特殊的批处理情况保留的一种形式foreach,例如,当您需要在处理函数中进行一些昂贵的对象实例化并且对每一行进行操作会产生巨大的开销时。由于foreachPartition每个分区仅调用处理函数一次,因此您可以实例化一次昂贵的对象,然后迭代分区的数据。这减少了处理时间,因为您只需执行一次昂贵的操作。
但是,您甚至无法调用foreach()或foreachPartition()调用流媒体源,因为这会导致AnalysisException. 相反,您必须使用的foreach()或foreachBatch()方法DataStreamWriter。DataStreamWriter.foreach()接受ForeachWriterwhile的一个实例DataStreamWriter.foreachBatch(),接受一个接收数据集和批次 ID 的 void 函数。ForeachWriter在其方法中接收纪元 ID open()。同样,foreachBatch()Scala 和 Java 风格在功能上是相同的,因此如果您要使用 Java 编写,请使用 Java 特定的风格。