我发现在Apache Spark SQL(版本2.2.0)中,当在窗口规范上使用的用户定义的聚合函数(UDAF)提供了多行相同的输入时,UDAF(看似)不会调用evaluate
方法正确.
我已经能够在Java和Scala中,本地和群集上重现这种行为.下面的代码显示了一个示例,如果行在前一行的1秒内,则标记为false.
class ExampleUDAF(val timeLimit: Long) extends UserDefinedAggregateFunction {
def deterministic: Boolean = true
def inputSchema: StructType = StructType(Array(StructField("unix_time", LongType)))
def dataType: DataType = BooleanType
def bufferSchema = StructType(Array(
StructField("previousKeepTime", LongType),
StructField("keepRow", BooleanType)
))
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = 0L
buffer(1) = false
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (buffer(0) == 0L) {
buffer(0) = input.getLong(0)
buffer(1) = true
} else {
val timeDiff = input.getLong(0) - buffer.getLong(0)
if (timeDiff < …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Apache Spark SQL在Java中创建用户定义的聚合函数(UDAF),该函数在完成时返回多个数组.我在网上搜索过,找不到任何有关如何执行此操作的示例或建议.
我能够返回单个数组,但无法弄清楚如何在evaluate()方法中以正确的格式获取数据以返回多个数组.
UDAF确实有效,因为我可以在evaluate()方法中打印出数组,我只是无法弄清楚如何将这些数组返回到调用代码(下面显示以供参考).
UserDefinedAggregateFunction customUDAF = new CustomUDAF();
DataFrame resultingDataFrame = dataFrame.groupBy().agg(customUDAF.apply(dataFrame.col("long_col"), dataFrame.col("double_col"))).as("processed_data");
Run Code Online (Sandbox Code Playgroud)
我在下面包含了整个自定义UDAF类,但关键方法是dataType()和evaluate方法(),它们首先显示.
任何帮助或建议将不胜感激.谢谢.
public class CustomUDAF extends UserDefinedAggregateFunction {
@Override
public DataType dataType() {
// TODO: Is this the correct way to return 2 arrays?
return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false))
.add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false));
}
@Override
public Object evaluate(Row buffer) {
// Data conversion
List<Long> longList = new ArrayList<Long>(buffer.getList(0));
List<Double> dataList = new ArrayList<Double>(buffer.getList(1));
// Processing of data (omitted)
// TODO: How to get data into format …
Run Code Online (Sandbox Code Playgroud) java aggregate-functions user-defined-functions apache-spark apache-spark-sql
Spark Streaming 提供了创建自定义接收器的功能,详细信息请参见此处。为了将接收器接收到的数据存储到Spark中,store(data)
需要使用该方法。
我存储到 Spark 的数据具有与其关联的某些属性。Spark Receiver 类由自定义接收器扩展,提供了几种 形式的存储方法store(data, metadata)
,这意味着元数据/属性可以与数据一起存储。下面的代码摘录显示了我如何使用此方法来存储数据及其元数据/属性。
public class CustomReceiver extends Receiver<String> {
public CustomReceiver() {
super(StorageLevel.MEMORY_AND_DISK_2());
}
@Override
public void onStart() {
new Thread() {
@Override
public void run() {
try {
receive();
} catch (IOException e) {
restart("Error connecting: ", e);
}
}
}.start();
}
@Override
public void onStop() {
// Not needed as receive() method closes resources when stopped
}
private void receive() throws IOException {
String …
Run Code Online (Sandbox Code Playgroud)