标签: flink-streaming

如何高效地将flink pipeline中的数据写入redis

我正在 Apache flink sql api 中构建管道。该管道执行简单的投影查询。但是,我需要在查询之前编写一次元组(确切地说是每个元组中的一些元素),在查询之后编写一次。事实证明,我用来写入 Redis 的代码严重降低了性能。即flink在很小的数据速率下做出反压。我的代码有什么问题以及如何改进。有什么建议请。

当我停止写入redis之前和之后,性能都非常好。这是我的管道代码:

public class QueryExample {
    public static Long throughputCounterAfter=new Long("0");
    public static void main(String[] args) {
        int k_partitions = 10;
        reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(5 * 32);
        Properties props = new Properties();
        props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
        props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
        // not to be shared with another job consuming the same topic
        props.setProperty("group.id", "flink-group");
        props.setProperty("enable.auto.commit","false");
        FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
                new SimpleStringSchema(),
                props);

        DataStream<String> purchasesStream = env
                .addSource(purchasesConsumer)
                .setParallelism(Math.min(5 * 32, k_partitions));
        DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks …
Run Code Online (Sandbox Code Playgroud)

redis backpressure apache-flink flink-streaming flink-sql

0
推荐指数
1
解决办法
2314
查看次数

Flink 窗口函数 getResult 未触发

我正在尝试在我的 Flink 作业中使用事件时间,并BoundedOutOfOrdernessTimestampExtractor用于提取时间戳并生成水印。但是我有一些输入Kafka具有稀疏流,它可以长时间没有数据,这使得根本没有调用getResultin AggregateFunction。我可以看到数据正在add发挥作用。

我已经设置getEnv().getConfig().setAutoWatermarkInterval(1000L); 我试过了

 eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
            .allowedLateness(WINDOW_LATENESS)
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))
Run Code Online (Sandbox Code Playgroud)

还有会话窗口

eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(EventTimeSessionWindows.withGap(Time.seconds(30)))
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))
Run Code Online (Sandbox Code Playgroud)

所有水印指标都显示No Watermark 如何让 Flink 忽略无水印的东西?

apache-flink flink-streaming

0
推荐指数
1
解决办法
656
查看次数

生成自己的数据的 Flink 流式传输示例

早些时候我询问了Flink 的一个简单的hello world 示例。这给了我一些很好的例子!

\n

不过,我想询问更多 \xe2\x80\x98streaming\xe2\x80\x99 示例,其中我们每秒生成一个输入值。理想情况下,这应该是随机的,但即使每次都是相同的值也可以。

\n

目标是获得一个在没有/最少外部接触的情况下 \xe2\x80\x98moves\xe2\x80\x99 的流。

\n

因此我的问题是:

\n

如何在没有外部依赖的情况下显示 Flink 实际上流式传输数据?

\n

我找到了如何通过在外部生成数据并写入 Kafka 或收听公共源来展示这一点,但是我试图以最小的依赖来解决它(就像从 Nifi 中的GenerateFlowFile 开始)。

\n

apache-flink flink-streaming

0
推荐指数
1
解决办法
1901
查看次数

哪个设置检查点间隔(毫秒)?

每个人。
请帮我。
我编写了 apache flink streraming 作业,它从 apache kafka 读取 json 消息(几秒钟内 500-1000 条消息),在 POJO 中反序列化它们并执行一些操作(filter-keyby-process-sink)。我使用具有 ExactlyOnce 语义的 RocksDB 状态后端。但我不明白我需要设置哪个检查点间隔?
有些论坛的人写的时间大多是 1000 或 5000 毫秒。我尝试将间隔设置为10ms、100ms、500ms、1000ms、5000ms。我没有注意到任何差异。

apache-flink flink-streaming flink-cep

0
推荐指数
1
解决办法
2592
查看次数

Scala:java.lang.ClassCastException:无法将 java.lang.invoke.SerializedLambda 的实例分配给 scala.Function1 类型的字段 Child1.myfun

我想在父类中将函数名称作为参数,以便子类可以设置它。此变量将用于父类的方法之一。

abstract class Parent[T: TypeInformation] {
   val myfun: T => Unit

   // A different method uses myfun
}

class Child1 extends Parent[User] {
   val service = new Service()

   val myfun: User => Unit = service.callme
}

class Service {
   def callme(user: User) => Unit = {
      println("We are here for user")
   }
}
Run Code Online (Sandbox Code Playgroud)

我是 Scala 的新手,但这看起来不错。虽然编译器没有抱怨,但我收到运行时异常并且工作无法启动:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:250)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at …
Run Code Online (Sandbox Code Playgroud)

java scala apache-flink flink-streaming

0
推荐指数
1
解决办法
674
查看次数