我的 Flink 反序列化器中的方法出现以下错误getProducedType:
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
Run Code Online (Sandbox Code Playgroud)
解串器:
class AvroDeserializer[T <: GenericRecord : ClassTag](topic: String, …Run Code Online (Sandbox Code Playgroud) Flink 仪表板很棒,显示了正在运行的作业的大量详细信息。然而,我注意到的一件事是,作业的源和接收器将分别显示接收的记录和发送的记录0。
现在我知道他们仍然在工作之外接收和发送记录,但这0往往会让人们感到非常困惑。选择这样是有原因的吗?或者有什么方法可以让它不发生0?
特别是对于接收器,如果序列化模式无法序列化消息(并且捕获并记录错误而不是导致作业失败),您将无法看到接收器实际输出的数字来反映这一点。你总是看到0并假设一切都成功了。
我尝试了解 ProcessWindowFunction 中可以使用的各种状态的差异。
首先,ProcessWindowFunction是一个AbstractRichFunction
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
extends AbstractRichFunction {...}
Run Code Online (Sandbox Code Playgroud)
因此它可以使用该方法
public RuntimeContext getRuntimeContext()
Run Code Online (Sandbox Code Playgroud)
获得一个状态
getRuntimeContext().getState
Run Code Online (Sandbox Code Playgroud)
更多,WindowProcessFunction的处理函数
def process(key: KEY, context: Context, elements: Iterable[IN], out:
Collector[OUT]) {}
Run Code Online (Sandbox Code Playgroud)
有一个上下文,其中又有两种方法允许我获取状态:
/**
* State accessor for per-key and per-window state.
*/
def windowState: KeyedStateStore
/**
* State accessor for per-key global state.
*/
def globalState: KeyedStateStore
Run Code Online (Sandbox Code Playgroud)
这是我的问题:
1)这些与 getRuntimeContext().getState 有什么关系?
2)我经常使用自定义触发器实现和全局窗口。在这种情况下,使用 getPartitionedState 检索状态。我可以在触发函数中访问 WindowProcessFunction 中定义的窗口状态吗?如果是这样怎么办?
3)Trigger类中没有可以重写的open方法,状态创建是如何处理的?只调用 getPartitionedState 是否安全,它还管理状态创建?
我在 flink 中有一个数据流,并使用 ProcessFunction 中的量规生成自己的指标。
由于这些指标对我的活动很重要,因此我想在流程执行后对它们进行单元测试。
不幸的是,我没有找到一种方法来实现适当的测试报告器。这是解释我的问题的简单代码。
这段代码有两个问题:
这是样本
import java.util.concurrent.atomic.AtomicInteger
import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.metrics.reporter.AbstractReporter
import org.apache.flink.metrics.{Gauge, Metric, MetricConfig}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.util.Collector
import org.scalatest.FunSuite
import org.scalatest.Matchers._
import org.scalatest.PartialFunctionValues._
import scala.collection.JavaConverters._
import scala.collection.mutable
/* Test based on Flink test example https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html */
class MultiplyByTwo extends ProcessFunction[Long, Long] {
override def processElement(data: Long, context: ProcessFunction[Long, Long]#Context, collector: Collector[Long]): Unit = {
collector.collect(data * 2L)
}
val nbrCalls …Run Code Online (Sandbox Code Playgroud) 我们有一个数据流,其中每个元素都是这种类型:
id: String
type: Type
amount: Integer
Run Code Online (Sandbox Code Playgroud)
amount我们想要聚合这个流并输出每周一次的总和。
目前的解决方案:
Flink 管道示例如下所示:
stream.keyBy(type)
.window(TumblingProcessingTimeWindows.of(Time.days(7)))
.reduce(sumAmount())
.addSink(someOutput())
Run Code Online (Sandbox Code Playgroud)
用于输入
| id | type | amount |
| 1 | CAT | 10 |
| 2 | DOG | 20 |
| 3 | CAT | 5 |
| 4 | DOG | 15 |
| 5 | DOG | 50 |
Run Code Online (Sandbox Code Playgroud)
如果窗口在记录之间结束3,4我们的输出将是:
| TYPE | sumAmount |
| CAT | 15 | (id 1 and id 3 added …Run Code Online (Sandbox Code Playgroud) 我读到,“..排序运算符必须缓冲它接收到的所有元素。然后,当它接收到水印时,它可以对时间戳低于水印的所有元素进行排序,并按排序顺序发出它们。这是正确,因为水印表明不能有更多元素到达并与已排序元素混合......” - https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
因此,水印似乎充当后续操作员开始处理的信号。我想,这也是触发器的作用。两者有什么区别?
我使用的是 flink 版本 1.8.0 。我的应用程序从 kafka 读取数据 -> 转换 -> 发布到 Kafka。为了避免重新启动期间出现任何重复,我想使用带有 Exactly Once 语义的 kafka 生产者,请在此处阅读:
我的卡夫卡版本是 1.1 。
return new FlinkKafkaProducer<String>( topic, new KeyedSerializationSchema<String>() {
public byte[] serializeKey(String element) {
// TODO Auto-generated method stub
return element.getBytes();
}
public byte[] serializeValue(String element) {
// TODO Auto-generated method stub
return element.getBytes();
}
public String getTargetTopic(String element) {
// TODO Auto-generated method stub
return topic;
}
},prop, opt, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 1);
Run Code Online (Sandbox Code Playgroud)
检查点代码:
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointTimeout(15 * 1000 ); …Run Code Online (Sandbox Code Playgroud) 我正在写一个案例来测试 flink 两步提交,下面是概述。
sink kafka曾经是kafka生产者。sink step是 mysql 接收器扩展two step commit。sink compare是mysql的sinkextend two step commit,这个sink偶尔会抛出异常来模拟检查点失败。
当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但是kafka消费者将读取上次成功的偏移量,并且kafka生产者会产生消息,即使他在这个检查点失败之前已经完成了。
在这种情况下如何避免重复消息?
感谢帮助。
环境:
弗林克1.9.1
爪哇1.8
卡夫卡2.11
卡夫卡生产者代码:
dataStreamReduce.addSink(new FlinkKafkaProducer<>(
"flink_output",
new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
UUID uuid = UUID.randomUUID();
JSONObject jsonObject = new JSONObject();
jsonObject.put("uuid", uuid.toString());
jsonObject.put("key1", element.f0);
jsonObject.put("key2", element.f1);
jsonObject.put("key3", element.f2);
jsonObject.put("indicate", element.f3);
return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
}
}, …Run Code Online (Sandbox Code Playgroud) Flink 中关于事件时间处理的水印是什么?为什么需要它。?为什么在所有使用事件时间的情况下都需要它。在所有情况下,我的意思是如果我不进行窗口操作那么为什么我们仍然需要水印。我来自火花背景。在 Spark 中,仅当我们在传入事件上使用窗口时才需要水印。
我读过几篇文章,在我看来,水印和窗口似乎是一样的。如果有差异,请解释并指出
发表你的回复我做了更多阅读。下面是一个更具体的查询。
主要问题:- 当我们接受迟到时,为什么还需要乱序。
假设
您有一个 BoundedOutOfOrdernessTimestampExtractor,其边界为 2 分钟,滚动窗口为 10 分钟,该窗口从 12:00 开始,到 12:10 结束:
12:01, A
12:04, B
WM, 12:02 // 12:04 - 2 分钟
12:02、中
12:08、深
12:14、东
西 西、12:12
12:16、西
西、12:14 // 12:16 - 2 分钟
12:09、西
在上面的示例中,[12:02, C] 记录不会被删除,而是包含在窗口 12:00 -12:10 中并稍后进行评估。-因此,水印也可以是事件时间戳
仅当配置了可接受的迟到时间 5 分钟时,记录 [12:09, G] 才会包含在窗口 12:00 - 12:10 中。这可以处理延迟和无序事件
所以现在添加我上面的问题,outoforder 选项必须是某个值(0 除外)的 BoundedOutOfOrdernessTimestampExtractor 而不是事件时间戳 istelf 吗?
什么是乱序可以实现而 allowedlateness 不能实现的以及在什么情况下可以实现?