标签: apache-flink

如何高效地将flink pipeline中的数据写入redis

我正在 Apache flink sql api 中构建管道。该管道执行简单的投影查询。但是,我需要在查询之前编写一次元组(确切地说是每个元组中的一些元素),在查询之后编写一次。事实证明,我用来写入 Redis 的代码严重降低了性能。即flink在很小的数据速率下做出反压。我的代码有什么问题以及如何改进。有什么建议请。

当我停止写入redis之前和之后,性能都非常好。这是我的管道代码:

public class QueryExample {
    public static Long throughputCounterAfter=new Long("0");
    public static void main(String[] args) {
        int k_partitions = 10;
        reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(5 * 32);
        Properties props = new Properties();
        props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
        props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
        // not to be shared with another job consuming the same topic
        props.setProperty("group.id", "flink-group");
        props.setProperty("enable.auto.commit","false");
        FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
                new SimpleStringSchema(),
                props);

        DataStream<String> purchasesStream = env
                .addSource(purchasesConsumer)
                .setParallelism(Math.min(5 * 32, k_partitions));
        DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks …
Run Code Online (Sandbox Code Playgroud)

redis backpressure apache-flink flink-streaming flink-sql

0
推荐指数
1
解决办法
2314
查看次数

如何在 Flink 中跳过损坏的消息?

如何在 Flink 中跳过损坏的消息

我有 DAG:KafkaSrcConsumer > FlatMap > Window > SinkFunction

现在,如果我在操作员“KafkaSrcConsumer”中收到来自 Kafka 的corruptedMessage,我想抛出/跳过该消息,并且我不想将该损坏的消息转发给下一个操作员“FlatMap”

我们如何在 Apache Flink 中实现这一点?

(注意:从 KafkaSrcConsumer 抛出异常将重新启动 flink 作业,我想避免这种情况,因为我只想跳过消息并移至下一条消息)

apache-flink

0
推荐指数
1
解决办法
448
查看次数

Apache Flink:两个(或多个)任务管理器之间共享状态

假设我有两个任务管理器,每个任务管理器只有一个任务槽。现在,我有以下工作:

    KeyedStream<...> streamA = env.addSource(...).keyBy(...);
    KeyedStream<...> streamB = env.addSource(...).keyBy(...);

    streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);
Run Code Online (Sandbox Code Playgroud)

一个任务管理器将使用来自 Kafka 主题的数据,另一个任务管理器将使用来自另一个 Kafka 主题的数据。

我将作业发送给作业管理器来执行它。Flink 分配两个任务管理器来处理 flatMap(因为任务管理器只有一个任务槽)。

flatMap 在事件之间进行简单的连接(使用两个键控状态):

    public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
        private ValueState<A> AState;
        private ValueState<B> BState;

        @Override
        public void open(Configuration config) {
            AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
            BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
        }

        @Override
        public void flatMap1(A event, Collector<String> out) throws Exception {
            B secondEvent = BState.value();

            if (secondEvent == null)
                AState.update(event);
            else {
                out.collect(...); …
Run Code Online (Sandbox Code Playgroud)

state join apache-flink

0
推荐指数
1
解决办法
1999
查看次数

Flink 窗口函数 getResult 未触发

我正在尝试在我的 Flink 作业中使用事件时间,并BoundedOutOfOrdernessTimestampExtractor用于提取时间戳并生成水印。但是我有一些输入Kafka具有稀疏流,它可以长时间没有数据,这使得根本没有调用getResultin AggregateFunction。我可以看到数据正在add发挥作用。

我已经设置getEnv().getConfig().setAutoWatermarkInterval(1000L); 我试过了

 eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
            .allowedLateness(WINDOW_LATENESS)
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))
Run Code Online (Sandbox Code Playgroud)

还有会话窗口

eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(EventTimeSessionWindows.withGap(Time.seconds(30)))
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))
Run Code Online (Sandbox Code Playgroud)

所有水印指标都显示No Watermark 如何让 Flink 忽略无水印的东西?

apache-flink flink-streaming

0
推荐指数
1
解决办法
656
查看次数

生成自己的数据的 Flink 流式传输示例

早些时候我询问了Flink 的一个简单的hello world 示例。这给了我一些很好的例子!

\n

不过,我想询问更多 \xe2\x80\x98streaming\xe2\x80\x99 示例,其中我们每秒生成一个输入值。理想情况下,这应该是随机的,但即使每次都是相同的值也可以。

\n

目标是获得一个在没有/最少外部接触的情况下 \xe2\x80\x98moves\xe2\x80\x99 的流。

\n

因此我的问题是:

\n

如何在没有外部依赖的情况下显示 Flink 实际上流式传输数据?

\n

我找到了如何通过在外部生成数据并写入 Kafka 或收听公共源来展示这一点,但是我试图以最小的依赖来解决它(就像从 Nifi 中的GenerateFlowFile 开始)。

\n

apache-flink flink-streaming

0
推荐指数
1
解决办法
1901
查看次数

哪个设置检查点间隔(毫秒)?

每个人。
请帮我。
我编写了 apache flink streraming 作业,它从 apache kafka 读取 json 消息(几秒钟内 500-1000 条消息),在 POJO 中反序列化它们并执行一些操作(filter-keyby-process-sink)。我使用具有 ExactlyOnce 语义的 RocksDB 状态后端。但我不明白我需要设置哪个检查点间隔?
有些论坛的人写的时间大多是 1000 或 5000 毫秒。我尝试将间隔设置为10ms、100ms、500ms、1000ms、5000ms。我没有注意到任何差异。

apache-flink flink-streaming flink-cep

0
推荐指数
1
解决办法
2592
查看次数

如何在flink应用程序中指定两个源、一个进程运算符和一个接收器运算符

我使用的是flink 1.3,我定义了两个流源,它们将发出相同的事件以供后续运算符处理(我定义的进程运算符和接收器运算符)

但看起来在 source-process-pink 管道中,我只能指定一个源,我会问如何指定两个或多个源并执行相同的进程和接收器

object FlinkApplication {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.addSource(new MySource1()) //How to MySource2 here?
      .setParallelism(1)
      .name("source1")
      .process(new MyProcess())
      .setParallelism(4)
      .addSink(new MySink())
      .setParallelism(2)
    env.execute("FlinkApplication")
  }

}
Run Code Online (Sandbox Code Playgroud)

apache-flink

0
推荐指数
1
解决办法
1107
查看次数

Apache Flink 中的并行性如何工作?

假设我有一个包含 3 个节点的 Flink 集群。一个节点用于作业管理器,另外 2 个节点用于任务管理器。每个任务管理器有 3 个任务槽。因此,当我提交并行度等于 2 的作业时,Flink 将分配两个任务槽。那么,我的问题是,Flink 将如何分配这些任务槽?

一些场景

Flink 是否为每个任务管理器分配一个任务槽?

两个任务槽是否有可能从同一个任务管理器分配?如果是,如果该特定节点由于某种原因关闭,我的作业将无法运行。在这种情况下如何避免停机?

apache-flink

0
推荐指数
1
解决办法
618
查看次数

Scala:java.lang.ClassCastException:无法将 java.lang.invoke.SerializedLambda 的实例分配给 scala.Function1 类型的字段 Child1.myfun

我想在父类中将函数名称作为参数,以便子类可以设置它。此变量将用于父类的方法之一。

abstract class Parent[T: TypeInformation] {
   val myfun: T => Unit

   // A different method uses myfun
}

class Child1 extends Parent[User] {
   val service = new Service()

   val myfun: User => Unit = service.callme
}

class Service {
   def callme(user: User) => Unit = {
      println("We are here for user")
   }
}
Run Code Online (Sandbox Code Playgroud)

我是 Scala 的新手,但这看起来不错。虽然编译器没有抱怨,但我收到运行时异常并且工作无法启动:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:250)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at …
Run Code Online (Sandbox Code Playgroud)

java scala apache-flink flink-streaming

0
推荐指数
1
解决办法
674
查看次数

Flink,使用“对象复用模式”的规则

Doc说这个模式会出现bug,但是没有告诉我这个模式的使用规则,什么情况下会出现bug?假设我有一份工作,

  1. 来源:kafka(字节[]数据),
  2. flat-map:将 byte[] 解析为 Google Protobuf 对象 'foo',创建一个 Tuple2<>(foo.id, foo),并返回这个 tuple2
  3. keyby 和 process:对于每个 id,将第一个 foo 放入 ValueState,如果有多个具有相同 id 的对象,则更新 ValueState。10 秒后发出第一个 foo(updated)。

这种情况下,开启‘对象复用模式’可以吗?

java apache-flink

0
推荐指数
1
解决办法
2608
查看次数