KeyedStream 中的 max 和 maxBy 有什么区别

Tom*_*Tom 4 apache-flink

  1. KeyedStream#max(字符串字段)

应用聚合,该聚合根据给定键给出给定字段表达式处的数据流的当前最大值。每个键都保留一个独立的聚合。字段表达式可以是公共字段的名称,也可以是带有 {@link DataStream} 基础类型括号的 getter 方法。点可用于深入查看对象,如 {@code "field1.fieldxy" } 中所示。

  1. KeyedStream#maxBy(字符串字段)

应用聚合,通过给定键为当前元素提供给定位置处的最大值。每个键都保留一个独立的聚合。如果有多个元素在给定位置具有最大值,则该运算符默认返回第一个。

这两个API的javadoc看起来非常相似,我想问一下它们之间有什么区别,以及何时选择这个或那个

小智 5

max 和 maxBy 之间的区别在于max 返回最大值,而 maxBy 返回该字段中具有最大值的元素。

 keyedStream.max(0);
 keyedStream.max("key");
 keyedStream.maxBy(0);
 keyedStream.maxBy("key");
Run Code Online (Sandbox Code Playgroud)

在下面的例子中,我们也可以看到区别:

使用max

  // Create a Tumbling Window with the values of 1 day:
            .timeWindow(Time.of(1, TimeUnit.DAYS))
            // Use the max Temperature of the day:
            .max("temperature")
            // And perform an Identity map, because we want to write all values of this day to the Database:
            .map(new MapFunction<elastic.model.LocalWeatherData, elastic.model.LocalWeatherData>() {
                @Override
                public elastic.model.LocalWeatherData map(elastic.model.LocalWeatherData localWeatherData) throws Exception {
                    return localWeatherData;
                }
            });
Run Code Online (Sandbox Code Playgroud)

使用maxBy

  // Now take the Maximum Temperature per day from the KeyedStream:
    DataStream<LocalWeatherData> maxTemperaturePerDay =
            localWeatherDataByStation
                    // Use non-overlapping tumbling window with 1 day length:
                    .timeWindow(Time.days(1))
                    // And use the maximum temperature:
                    .maxBy("temperature");
Run Code Online (Sandbox Code Playgroud)