我是 Flink 的新手,试图了解如何最有效地使用它。
我正在尝试使用 Window API,从 CSV 文件中读取。读取的行被转换为案例类,因此
case class IncomingDataUnit (
sensorUUID: String, radiationLevel: Int,photoSensor: Float,
humidity: Float,timeStamp: Long, ambientTemperature: Float)
extends Serializable {
}
Run Code Online (Sandbox Code Playgroud)
而且,这就是我阅读行的方式:
env.readTextFile(inputPath).map(datum => {
val fields = datum.split(",")
IncomingDataUnit(
fields(0), // sensorUUID
fields(1).toInt, // radiationLevel
fields(2).toFloat, // photoSensor
fields(3).toFloat, // humidity
fields(4).toLong, // timeStamp
fields(5).toFloat // ambientTemperature
)
})
Run Code Online (Sandbox Code Playgroud)
后来,使用一个简单的窗口,我尝试打印最大环境温度,因此:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val readings =
readIncomingReadings(env,"./sampleIOTTiny.csv")
.map(e => (e.sensorUUID,e.ambientTemperature))
.timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
.trigger(CountTrigger.of(5))
.evictor(CountEvictor.of(4))
.max(1)
readings.print
Run Code Online (Sandbox Code Playgroud)
输出包含这些(来自一堆 DEBUG 日志语句):
1> (probe-987f2cb6,29.43)
1> (probe-987f2cb6,29.43)
3> (probe-dccefede,30.02) …Run Code Online (Sandbox Code Playgroud) 我想在 Flink 中的 WindowedStream 上执行一些操作,比如平均操作。但可用的预定义操作非常有限,例如求和、最小值、最大值等。
val windowedStream = valueStream
.keyBy(0)
.timeWindow(Time.minutes(5))
.sum(2) //Change this to average?
Run Code Online (Sandbox Code Playgroud)
假设我想求平均值,我该怎么做呢?
我有来自 Kafka 的消息流,如下所示
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props));
Run Code Online (Sandbox Code Playgroud)
如何迭代流中的每条消息并对其执行某些操作?我看到一个iterate()方法,DataStream但它不返回Iterator<String>.
我有一个流,其中记录按顺序到达。我应用了一个 map 函数,然后在它上面应用了 keyBy 函数。记录的顺序是否会在每个具有相同键的记录流中保持?
Ordering of Records in Stream 中有一个类似的问题。但是我在那里给出的答案和从链接“ https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/programming-model.html ”复制的以下描述之间感到困惑。
" 在重新分配交换中,元素之间的排序仅保留在每对发送和接收子任务中(例如,map() 的子任务 [1] 和 keyBy/window 的子任务 [2])。所以在这个例子中,保留每个键内的排序,但并行性确实引入了关于不同键的聚合结果到达接收器的顺序的不确定性。”
在给出的示例中,keyBy 的子任务 [2] 接收来自 map 的子任务 [1] 和子任务 [2] 的元素。如果仅在子任务之间维护排序,如何保留每个键内的排序?
我是 Apache Flink 的新手,想了解 DataStream 和 Table API 之间的用例。请帮助我了解何时选择 Table API 而不是 DataStream API。
根据我的理解,可以使用 Table API 完成的事情也可以使用 DataStream API 完成。两种 API 有何不同?
从 Flink 文档中,我看到有两个不同的窗口对象:
timeWindow(Time.seconds(5))而且window(TumblingWindow/SlidingWindow) ....,我对它们之间的区别感到困惑,尤其是timeWindow,它是 aSlidingWindow还是TumblingWindow?
我正在尝试将一个非常简单的窗口函数应用于 Apache Flink 中的有限数据流(本地,无集群)。这是示例:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(ProcessingTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.sorted.toString())
}
})
.print()
env.execute()
Run Code Online (Sandbox Code Playgroud)
在这里,我尝试将在一秒钟内到达窗口的所有元素分组,然后只打印这些组。
我假设所有元素都将在不到一秒的时间内生成并进入一个窗口,因此print(). 但是,当我运行它时,根本没有打印任何内容。
如果我删除所有窗口的东西,比如
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.print()
Run Code Online (Sandbox Code Playgroud)
我看到运行后打印的元素。我也用文件源试过这个,没有区别。
我机器上的默认并行度是 6。如果我试验并行度和延迟的级别,像这样
val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
.fromCollection(List("a", "b", "c", "d", "e"))
.map { x => Thread.sleep(1500); x }
Run Code Online (Sandbox Code Playgroud)
我能够将一些——不是全部——元素分组,然后打印出来。
我的第一个假设是源的完成速度远快于 …
我收到以下错误
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:连接被拒绝:localhost/127.0.0.1:8081
在尝试使用 Flink 流式传输数据时。大约一个月前,我的代码运行良好,现在出现此错误。之前我还能够通过在http://localhost:8081上访问 Flink Web Dashboard 来监控流媒体进度,现在我的浏览器返回一个错误,它无法访问服务器“localhost”。可能是什么问题呢?先感谢您。
错误消息的完整输出:
org.apache.flink.client.program.ProgramInvocationException:无法检索执行结果。(JobID: cfa44a8fd6a62f51ef8c0f956d55ee56) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java): org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at examples.WaterLevelKafka.main(WaterLevelKafka.java:124) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun。 reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink 。客户。UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java: 1120) 由以下原因引起:org.apache.flink.runtime.client.JobSubmissionException:无法提交 JobGraph。在 org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:379) 在 java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) 在 java.util.concurrent。 CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:19.apache7) at org .flink。flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) 在 org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java) :459) 在 org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) 在 org.apache.flink.shaded.netty4.io.netty.util。 concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.lang.Thread.run(Thread.java:745) 引起:java.util.concurrent.CompletionException:org.apache.flink.runtime.concurrent.FutureUtils$ RetryException: 无法完成操作。重试次数已用完。在 java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) 在 java.util.concurrent.CompletableFuture。completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) ... 22 more作者:org.apache.flink.runtime.concurrent.FutureUtils$RetryException:无法完成操作。重试次数已用完。... 20 多个引起:java.util.concurrent.CompletionException:org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:连接被拒绝:localhost/127.0.0.1:8081 at …
是否有命名UIDfor 运算符的最佳实践?它可以是简单的东西吗
stream.flatMap(new FlatMapFunc).uid("1")
.assignTimestampsAndWatermarks(new TimestampExtractor).uid("2")
.keyBy(r => r.key )
.timeWindow(Time.minutes(10))
.allowedLateness(Time.minutes(30))
.process(new ProcessFunc).uid("3")
Run Code Online (Sandbox Code Playgroud)
或者有一些命名 uid 的规则/建议?
b/w Native Kubernetes和Kubernetes部署的主要区别是什么?
我是 Kubernetes 的新手,并试图了解它们上的 Flink 部署有何不同。如果对内部结构有任何见解,那将有很大帮助。