小编kad*_*ank的帖子

Flink:使用后期元素加水印

我正在 Flink 中进行实时流处理,其中 Kafka 是消息队列。我正在应用 120 秒的 EventTimeSlidingWindow。和1秒的幻灯片。我还在事件时间的每一秒插入水印。

我担心的是,如果元素迟到,在水印之后会发生什么?现在我的情况是,Flink 只是丢弃其各自水印之后的消息。filnk 是否提供了任何机制来处理此类迟到的消息,例如维护单独的窗口?我也浏览了文档,但我没有弄清楚。

apache-flink flink-streaming

4
推荐指数
1
解决办法
2161
查看次数

Flink错误:java.lang.NoSuchMethodError:org.apache.flink.api.table.Table

我尝试使用 flink 的表和 sql api 作为一个简单的示例,我从文件中读取字符串,将其转换为 Tuple2 并尝试将其插入表中。这是我的代码。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.table.Table;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class table_streaming_test
{
    public static void main (String[] args) throws Exception
    {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //create execution environment
        StreamTableEnvironment tEnv= StreamTableEnvironment.getTableEnvironment(env); 
        env.setParallelism(1);
        DataStream<String> datastream_in= env.readTextFile("file:/home/rishikesh/new_workspace1/table_streaming/stocks.txt");
         DataStream<Tuple2<String,Integer>> ds=  datastream_in
             .flatMap(new Splitter());  // transformation flatmap
         Table msg=tEnv.fromDataStream(ds).as("symbol,price");
         Table result = msg.select("symbol ='A'");
         DataStream<String> ds2 =tEnv.toDataStream(result, String.class);
         ds2.print();
         env.execute();
    }
public static class Splitter implements FlatMapFunction<String,     Tuple2<String, …
Run Code Online (Sandbox Code Playgroud)

java apache-flink flink-streaming

2
推荐指数
1
解决办法
9185
查看次数

标签 统计

apache-flink ×2

flink-streaming ×2

java ×1