我正在 Flink 中进行实时流处理,其中 Kafka 是消息队列。我正在应用 120 秒的 EventTimeSlidingWindow。和1秒的幻灯片。我还在事件时间的每一秒插入水印。
我担心的是,如果元素迟到,在水印之后会发生什么?现在我的情况是,Flink 只是丢弃其各自水印之后的消息。filnk 是否提供了任何机制来处理此类迟到的消息,例如维护单独的窗口?我也浏览了文档,但我没有弄清楚。
我尝试使用 flink 的表和 sql api 作为一个简单的示例,我从文件中读取字符串,将其转换为 Tuple2 并尝试将其插入表中。这是我的代码。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.table.Table;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class table_streaming_test
{
public static void main (String[] args) throws Exception
{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //create execution environment
StreamTableEnvironment tEnv= StreamTableEnvironment.getTableEnvironment(env);
env.setParallelism(1);
DataStream<String> datastream_in= env.readTextFile("file:/home/rishikesh/new_workspace1/table_streaming/stocks.txt");
DataStream<Tuple2<String,Integer>> ds= datastream_in
.flatMap(new Splitter()); // transformation flatmap
Table msg=tEnv.fromDataStream(ds).as("symbol,price");
Table result = msg.select("symbol ='A'");
DataStream<String> ds2 =tEnv.toDataStream(result, String.class);
ds2.print();
env.execute();
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, …Run Code Online (Sandbox Code Playgroud)