当我执行Flink应用程序时,它会给我以下信息NullPointerException:
2017-08-08 13:21:57,690 INFO com.datastax.driver.core.Cluster - New Cassandra host /127.0.0.1:9042 added
2017-08-08 13:22:02,427 INFO org.apache.flink.runtime.taskmanager.Task - TriggerWindow(TumblingEventTimeWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@15d1c80b}, EventTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:302)) -> Filter -> Flat Map -> Sink: Cassandra Sink (1/1) (092a7ef50209f7a050d9d82be1e03d80) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark:
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) …Run Code Online (Sandbox Code Playgroud) 我想将Gelly的文档添加到我的项目中,但它给了我这个错误:
找不到的来源:org.apache.flink:flink-gelly_2.10:1.2-SNAPSHOT
这是在我的pom.xml中
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly_2.10</artifactId>
<version>1.2-SNAPSHOT</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
我试着找一个不同版本的gelly来解决这个问题,但找不到任何.有没有其他方法来获取文档?
在键控流上,我想为每个新传入事件计算一次窗口函数,新事件一到达就立即计算,同时为它提供过去 30 天内该键的所有早期事件的上下文作为迭代器.
预期行为类似于具有 30 天长度和 1 纳秒滑动的滑动窗口,每个传入事件仅计算一次窗口函数。
我看不到如何将此行为映射到带有/不带有触发器/驱逐器等的内置翻滚/滑动/会话窗口之上。
有人可以帮忙吗?或者这是否需要编写我自己的窗口分配器或我自己的键控状态处理?
如果有人给出用例来解释下面给出的每个Watermark API和Apache flink之间的区别,将会很有帮助
AssignerWithPeriodicWatermarks[T]AssignerWithPunctuatedWatermarks[T]我有一组经过加密的数据流,需要计算不同时间段(1分钟,5分钟,1天,1周)的滚动计数。
是否可以在单个应用程序中计算所有四个窗口计数?
Flink de/serialise operator state 的频率如何?每个获取/更新还是基于检查点?状态后端是否有所作为?
我怀疑在具有不同键(数百万)和每个键每秒数千个事件的键控流的情况下,反/序列化可能是一个大问题。我对吗?
我有一个来自 Kafka 的 DataStream,它对 MyModel 中的一个字段有 2 个可能的值。MyModel 是一个 pojo,具有从来自 Kafka 的消息解析的特定领域字段。
DataStream<MyModel> stream = env.addSource(myKafkaConsumer);
Run Code Online (Sandbox Code Playgroud)
我想分别在每个键 a1、a2 上应用窗口和运算符。有什么好的方法可以将它们分开?我有 2 个选项过滤和选择,但不知道哪个更快。
过滤方法
stream
.filter(<MyModel.a == a1>)
.keyBy()
.window()
.apply()
.addSink()
stream
.filter(<MyModel.a == a2>)
.keyBy()
.window()
.apply()
.addSink()
Run Code Online (Sandbox Code Playgroud)
拆分和选择方法
SplitStream<MyModel> split = stream.split(…)
split
.select(<MyModel.a == a1>)
…
.addSink()
split
.select<MyModel.a == a2>()
…
.addSink()
Run Code Online (Sandbox Code Playgroud)
如果 split 和 select 更好,如果我想根据 MyModel 中某个字段的值进行拆分,如何实现它们?
我第一次使用 flink(1.6、1.7),并使用来自https://www.gharchive.org/的 github 存档中的数据,但将该数据用作流数据源。
我的简单示例只是计算每日窗口中每个用户的所有事件,我尝试复制相同的示例,但使用 TableEnvironment 和 SQL 支持。
但是,我遇到了以下错误:
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit 类不包含字段修改时间的设置器,如下所示:
8-12-04 14:17:02:115 INFO main exploration.StreamingTableApp:32 - Starting Streaming Table Flink App Example...
18-12-04 14:17:02:174 INFO main typeutils.TypeExtractor:1818 - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime
18-12-04 14:17:02:176 INFO main typeutils.TypeExtractor:1857 - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data …Run Code Online (Sandbox Code Playgroud) 我无法理解水印和允许迟到的概念。
以下是 [邮件存档| 的摘录] https://www.mail-archive.com/user@flink.apache.org/msg08758.html]讨论了水印,但我还有几个问题。以下是引用的例子:
假设您有一个
BoundedOutOfOrdernessTimestampExtractor2 分钟限制和 10 分钟滚动窗口,从 12:00 开始到 12:10 结束:如果您有以下流序列:
Run Code Online (Sandbox Code Playgroud)12:01, A 12:04, B WM, 12:02 // 12:04 - 2 minutes 12:02, C 12:08, D 12:14, E WM, 12:12 12:16, F WM, 12:14 // 12:16 - 2 minutes 12:09, G不允许迟到
当窗口操作符接收并评估此时
<WM, 12:12>包含的窗口时,将逻辑时间转发到12:12,并最终清除其状态。后来被忽略。[A, B, C, D]<12:09, G>允许迟到3分钟
窗口操作符在接收到窗口时评估窗口
<WM, 12:12>,但其状态尚未清除。收到后状态将被清除<WM, 12:14>(窗口触发时间 12:10 + 允许延迟 3 分钟)。<12:09, G>再次被忽略。允许迟到5分钟
窗口操作符在接收到窗口时评估窗口
<WM, 12:12>,但其状态尚未清除。当<12:09, …