标签: apache-flink

由于 Avro 数组类型,Flink 抛出 Kryo 错误

我的 Flink 反序列化器中的方法出现以下错误getProducedType

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
Run Code Online (Sandbox Code Playgroud)

解串器:

class AvroDeserializer[T <: GenericRecord : ClassTag](topic: String, …
Run Code Online (Sandbox Code Playgroud)

avro kryo apache-flink flink-streaming

3
推荐指数
1
解决办法
2423
查看次数

为什么 Flink 仪表板不显示从源接收或写入接收器的记录数?

Flink 仪表板很棒,显示了正在运行的作业的大量详细信息。然而,我注意到的一件事是,作业的源和接收器将分别显示接收的记录和发送的记录0

现在我知道他们仍然在工作之外接收和发送记录,但这0往往会让人们感到非常困惑。选择这样是有原因的吗?或者有什么方法可以让它不发生0

特别是对于接收器,如果序列化模式无法序列化消息(并且捕获并记录错误而不是导致作业失败),您将无法看到接收器实际输出的数字来反映这一点。你总是看到0并假设一切都成功了。

dashboard apache-flink

3
推荐指数
1
解决办法
2385
查看次数

ProcessWindowFunction 中的 Apache Flink 状态

我尝试了解 ProcessWindowFunction 中可以使用的各种状态的差异。

首先,ProcessWindowFunction是一个AbstractRichFunction

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
    extends AbstractRichFunction {...}
Run Code Online (Sandbox Code Playgroud)

因此它可以使用该方法

public RuntimeContext getRuntimeContext() 
Run Code Online (Sandbox Code Playgroud)

获得一个状态

getRuntimeContext().getState
Run Code Online (Sandbox Code Playgroud)

更多,WindowProcessFunction的处理函数

def process(key: KEY, context: Context, elements: Iterable[IN], out: 
    Collector[OUT]) {}
Run Code Online (Sandbox Code Playgroud)

有一个上下文,其中又有两种方法允许我获取状态:

/**
  * State accessor for per-key and per-window state.
  */
def windowState: KeyedStateStore

/**
  * State accessor for per-key global state.
  */
def globalState: KeyedStateStore
Run Code Online (Sandbox Code Playgroud)

这是我的问题:

1)这些与 getRuntimeContext().getState 有什么关系?

2)我经常使用自定义触发器实现和全局窗口。在这种情况下,使用 getPartitionedState 检索状态。我可以在触发函数中访问 WindowProcessFunction 中定义的窗口状态吗?如果是这样怎么办?

3)Trigger类中没有可以重写的open方法,状态创建是如何处理的?只调用 getPartitionedState 是否安全,它还管理状态创建?

apache-flink

3
推荐指数
1
解决办法
1869
查看次数

如何在 flink 中统一测试指标

我在 flink 中有一个数据流,并使用 ProcessFunction 中的量规生成自己的指标。
由于这些指标对我的活动很重要,因此我想在流程执行后对它们进行单元测试。
不幸的是,我没有找到一种方法来实现适当的测试报告器。这是解释我的问题的简单代码。
这段代码有两个问题:

  1. 我如何触发仪表
  2. 如何通过 env.execute 实例化记者

这是样本

import java.util.concurrent.atomic.AtomicInteger

import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.metrics.reporter.AbstractReporter
import org.apache.flink.metrics.{Gauge, Metric, MetricConfig}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.util.Collector
import org.scalatest.FunSuite
import org.scalatest.Matchers._
import org.scalatest.PartialFunctionValues._

import scala.collection.JavaConverters._
import scala.collection.mutable

/* Test based on Flink test example https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html */

class MultiplyByTwo extends ProcessFunction[Long, Long] {
  override def processElement(data: Long, context: ProcessFunction[Long, Long]#Context, collector: Collector[Long]): Unit = {
    collector.collect(data * 2L)
  }

  val nbrCalls …
Run Code Online (Sandbox Code Playgroud)

scala apache-flink

3
推荐指数
1
解决办法
1808
查看次数

Flink 窗口:聚合并输出到接收器

我们有一个数据流,其中每个元素都是这种类型:

id: String
type: Type
amount: Integer
Run Code Online (Sandbox Code Playgroud)

amount我们想要聚合这个流并输出每周一次的总和。

目前的解决方案:

Flink 管道示例如下所示:

stream.keyBy(type)
      .window(TumblingProcessingTimeWindows.of(Time.days(7)))
      .reduce(sumAmount())
      .addSink(someOutput())
Run Code Online (Sandbox Code Playgroud)

用于输入

| id | type | amount |
| 1  | CAT  | 10     |
| 2  | DOG  | 20     |
| 3  | CAT  | 5      |
| 4  | DOG  | 15     |
| 5  | DOG  | 50     |
Run Code Online (Sandbox Code Playgroud)

如果窗口在记录之间结束34我们的输出将是:

| TYPE | sumAmount |
| CAT  | 15        | (id 1 and id 3 added …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

3
推荐指数
1
解决办法
7791
查看次数

Flink 中的水印和触发器有什么区别?

我读到,“..排序运算符必须缓冲它接收到的所有元素。然后,当它接收到水印时,它可以对时间戳低于水印的所有元素进行排序,并按排序顺序发出它们。这是正确,因为水印表明不能有更多元素到达并与已排序元素混合......” - https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams

因此,水印似乎充当后续操作员开始处理的信号。我想,这也是触发器的作用。两者有什么区别?

real-time stream bigdata apache-flink flink-streaming

3
推荐指数
1
解决办法
1402
查看次数

如何在apache kafka连接器中实现一次语义

我使用的是 flink 版本 1.8.0 。我的应用程序从 kafka 读取数据 -> 转换 -> 发布到 Kafka。为了避免重新启动期间出现任何重复,我想使用带有 Exactly Once 语义的 kafka 生产者,请在此处阅读:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-011-and-newer

我的卡夫卡版本是 1.1 。

        return new FlinkKafkaProducer<String>( topic,  new KeyedSerializationSchema<String>() {


            public byte[] serializeKey(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public byte[] serializeValue(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public String getTargetTopic(String element) {
                // TODO Auto-generated method stub
                return topic;
            }
        },prop, opt, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 1);
Run Code Online (Sandbox Code Playgroud)

检查点代码:

    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setCheckpointTimeout(15 * 1000 ); …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

3
推荐指数
1
解决办法
1446
查看次数

flink kafka生产者在检查点恢复时以一次模式发送重复消息

我正在写一个案例来测试 flink 两步提交,下面是概述。

在此输入图像描述

sink kafka曾经是kafka生产者。sink step是 mysql 接收器扩展two step commitsink compare是mysql的sinkextend two step commit,这个sink偶尔会抛出异常来模拟检查点失败。

当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但是kafka消费者将读取上次成功的偏移量,并且kafka生产者会产生消息,即使他在这个检查点失败之前已经完成了

在这种情况下如何避免重复消息?

感谢帮助。

环境

  • 弗林克1.9.1

  • 爪哇1.8

  • 卡夫卡2.11

卡夫卡生产者代码:

        dataStreamReduce.addSink(new FlinkKafkaProducer<>(
                "flink_output",
                new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
                        UUID uuid = UUID.randomUUID();
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("uuid", uuid.toString());
                        jsonObject.put("key1", element.f0);
                        jsonObject.put("key2", element.f1);
                        jsonObject.put("key3", element.f2);
                        jsonObject.put("indicate", element.f3);
                        return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
                    }
                }, …
Run Code Online (Sandbox Code Playgroud)

streaming blink apache-flink flink-streaming

3
推荐指数
1
解决办法
1346
查看次数

Flink 中关于事件时间处理的水印是什么?为什么需要它。?

Flink 中关于事件时间处理的水印是什么?为什么需要它。?为什么在所有使用事件时间的情况下都需要它。在所有情况下,我的意思是如果我不进行窗口操作那么为什么我们仍然需要水印。我来自火花背景。在 Spark 中,仅当我们在传入事件上使用窗口时才需要水印。

我读过几篇文章,在我看来,水印和窗口似乎是一样的。如果有差异,请解释并指出

发表你的回复我做了更多阅读。下面是一个更具体的查询。
主要问题:- 当我们接受迟到时,为什么还需要乱序。

假设
您有一个 BoundedOutOfOrdernessTimestampExtractor,其边界为 2 分钟,滚动窗口为 10 分钟,该窗口从 12:00 开始,到 12:10 结束:
12:01, A
12:04, B
WM, 12:02 // 12:04 - 2 分钟
12:02、中
12:08、深
12:14、东
西 西、12:12
12:16、西
西、12:14 // 12:16 - 2 分钟
12:09、西

在上面的示例中,[12:02, C] 记录不会被删除,而是包含在窗口 12:00 -12:10 中并稍后进行评估。-因此,水印也可以是事件时间戳

仅当配置了可接受的迟到时间 5 分钟时,记录 [12:09, G] 才会包含在窗口 12:00 - 12:10 中。这可以处理延迟和无序事件

所以现在添加我上面的问题,outoforder 选项必须是某个值(0 除外)的 BoundedOutOfOrdernessTimestampExtractor 而不是事件时间戳 istelf 吗?
什么是乱序可以实现而 allowedlateness 不能实现的以及在什么情况下可以实现?

apache-flink flink-streaming

3
推荐指数
1
解决办法
3602
查看次数

Flink 如何决定何时采取检查点?

我想了解什么决定了何时采取检查点。这与检查点间隔有何关系?

apache-flink flink-streaming

3
推荐指数
1
解决办法
1083
查看次数