小编ab8*_*853的帖子

Apache Spark SQL UDAF over window显示重复输入的奇怪行为

我发现在Apache Spark SQL(版本2.2.0)中,当在窗口规范上使用的用户定义的聚合函数(UDAF)提供了多行相同的输入时,UDAF(看似)不会调用evaluate方法正确.

我已经能够在Java和Scala中,本地和群集上重现这种行为.下面的代码显示了一个示例,如果行在前一行的1秒内,则标记为false.

class ExampleUDAF(val timeLimit: Long) extends UserDefinedAggregateFunction {
  def deterministic: Boolean = true
  def inputSchema: StructType = StructType(Array(StructField("unix_time", LongType)))
  def dataType: DataType = BooleanType

  def bufferSchema = StructType(Array(
    StructField("previousKeepTime", LongType),
    StructField("keepRow", BooleanType)
  ))

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = 0L
    buffer(1) = false
  }

  def update(buffer: MutableAggregationBuffer, input: Row) = {    
    if (buffer(0) == 0L) {
      buffer(0) = input.getLong(0)
      buffer(1) = true
    } else {
      val timeDiff = input.getLong(0) - buffer.getLong(0)

      if (timeDiff < …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

13
推荐指数
1
解决办法
457
查看次数

从Apache Spark SQL中的用户定义聚合函数(UDAF)返回多个数组

我正在尝试使用Apache Spark SQL在Java中创建用户定义的聚合函数(UDAF),该函数在完成时返回多个数组.我在网上搜索过,找不到任何有关如何执行此操作的示例或建议.

我能够返回单个数组,但无法弄清楚如何在evaluate()方法中以正确的格式获取数据以返回多个数组.

UDAF确实有效,因为我可以在evaluate()方法中打印出数组,我只是无法弄清楚如何将这些数组返回到调用代码(下面显示以供参考).

UserDefinedAggregateFunction customUDAF = new CustomUDAF();
DataFrame resultingDataFrame = dataFrame.groupBy().agg(customUDAF.apply(dataFrame.col("long_col"), dataFrame.col("double_col"))).as("processed_data");
Run Code Online (Sandbox Code Playgroud)

我在下面包含了整个自定义UDAF类,但关键方法是dataType()和evaluate方法(),它们首先显示.

任何帮助或建议将不胜感激.谢谢.

public class CustomUDAF extends UserDefinedAggregateFunction {

    @Override
    public DataType dataType() {
        // TODO: Is this the correct way to return 2 arrays?
        return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false))
            .add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false));
    }

    @Override
    public Object evaluate(Row buffer) {
        // Data conversion
        List<Long> longList = new ArrayList<Long>(buffer.getList(0));
        List<Double> dataList = new ArrayList<Double>(buffer.getList(1));

        // Processing of data (omitted)

        // TODO: How to get data into format …
Run Code Online (Sandbox Code Playgroud)

java aggregate-functions user-defined-functions apache-spark apache-spark-sql

9
推荐指数
1
解决办法
4117
查看次数

如何访问 Spark Streaming 自定义接收器存储的元数据?

Spark Streaming 提供了创建自定义接收器的功能,详细信息请参见此处。为了将接收器接收到的数据存储到Spark中,store(data)需要使用该方法。

我存储到 Spark 的数据具有与其关联的某些属性。Spark Receiver 类由自定义接收器扩展,提供了几种 形式的存储方法store(data, metadata),这意味着元数据/属性可以与数据一起存储。下面的代码摘录显示了我如何使用此方法来存储数据及其元数据/属性。

public class CustomReceiver extends Receiver<String> {

    public CustomReceiver() {
        super(StorageLevel.MEMORY_AND_DISK_2());
    }

    @Override
    public void onStart() {
        new Thread() {
            @Override
            public void run() {
                try {
                    receive();
                } catch (IOException e) {
                    restart("Error connecting: ", e);
                }
            }
        }.start();
    }

    @Override
    public void onStop() {
        // Not needed as receive() method closes resources when stopped
    }

    private void receive() throws IOException {
        String …
Run Code Online (Sandbox Code Playgroud)

java apache-spark spark-streaming

6
推荐指数
1
解决办法
811
查看次数