标签: flink-streaming

apache flink:如何解释 DataStream.print 输出?

我是 Flink 的新手,试图了解如何最有效地使用它。

我正在尝试使用 Window API,从 CSV 文件中读取。读取的行被转换为案例类,因此

case class IncomingDataUnit (
sensorUUID: String, radiationLevel: Int,photoSensor: Float,
humidity: Float,timeStamp: Long, ambientTemperature: Float)
  extends Serializable {

}
Run Code Online (Sandbox Code Playgroud)

而且,这就是我阅读行的方式:

env.readTextFile(inputPath).map(datum => {
      val fields = datum.split(",")
      IncomingDataUnit(
        fields(0),              // sensorUUID
        fields(1).toInt,        // radiationLevel
        fields(2).toFloat,      // photoSensor
        fields(3).toFloat,      // humidity
        fields(4).toLong,       // timeStamp
        fields(5).toFloat       // ambientTemperature
      )
    })
Run Code Online (Sandbox Code Playgroud)

后来,使用一个简单的窗口,我尝试打印最大环境温度,因此:

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val readings =
      readIncomingReadings(env,"./sampleIOTTiny.csv")
      .map(e => (e.sensorUUID,e.ambientTemperature))
      .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
      .trigger(CountTrigger.of(5))
      .evictor(CountEvictor.of(4))
      .max(1)

readings.print
Run Code Online (Sandbox Code Playgroud)

输出包含这些(来自一堆 DEBUG 日志语句):

1> (probe-987f2cb6,29.43)
1> (probe-987f2cb6,29.43)
3> (probe-dccefede,30.02) …
Run Code Online (Sandbox Code Playgroud)

scala apache-flink flink-streaming

2
推荐指数
1
解决办法
4115
查看次数

Flink中如何对WindowedStream进行自定义操作?

我想在 Flink 中的 WindowedStream 上执行一些操作,比如平均操作。但可用的预定义操作非常有限,例如求和、最小值、最大值等。

val windowedStream = valueStream
                          .keyBy(0)
                          .timeWindow(Time.minutes(5))
                          .sum(2) //Change this to average?
Run Code Online (Sandbox Code Playgroud)

假设我想求平均值,我该怎么做呢?

apache-flink flink-streaming

2
推荐指数
1
解决办法
399
查看次数

如何迭代 Flink DataStream 中的每条消息?

我有来自 Kafka 的消息流,如下所示

DataStream<String> messageStream = env
  .addSource(new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props));
Run Code Online (Sandbox Code Playgroud)

如何迭代流中的每条消息并对其执行某些操作?我看到一个iterate()方法,DataStream但它不返回Iterator<String>.

apache-flink flink-streaming

2
推荐指数
1
解决办法
2041
查看次数

Flink 中键控流中的记录排序

我有一个流,其中记录按顺序到达。我应用了一个 map 函数,然后在它上面应用了 keyBy 函数。记录的顺序是否会在每个具有相同键的记录流中保持?

Ordering of Records in Stream 中有一个类似的问题。但是我在那里给出的答案和从链接“ https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/programming-model.html ”复制的以下描述之间感到困惑。

" 在重新分配交换中,元素之间的排序仅保留在每对发送和接收子任务中(例如,map() 的子任务 [1] 和 keyBy/window 的子任务 [2])。所以在这个例子中,保留每个键内的排序,但并行性确实引入了关于不同键的聚合结果到达接收器的顺序的不确定性。”

在给出的示例中,keyBy 的子任务 [2] 接收来自 map 的子任务 [1] 和子任务 [2] 的元素。如果仅在子任务之间维护排序,如何保留每个键内的排序?

apache-flink flink-streaming

2
推荐指数
1
解决办法
819
查看次数

Apache Flink中DataStream和Table API的区别

我是 Apache Flink 的新手,想了解 DataStream 和 Table API 之间的用例。请帮助我了解何时选择 Table API 而不是 DataStream API。

根据我的理解,可以使用 Table API 完成的事情也可以使用 DataStream API 完成。两种 API 有何不同?

apache-flink flink-streaming flink-sql

2
推荐指数
1
解决办法
1245
查看次数

Flink中window和timewindow有什么区别

从 Flink 文档中,我看到有两个不同的窗口对象: timeWindow(Time.seconds(5))而且window(TumblingWindow/SlidingWindow) ....,我对它们之间的区别感到困惑,尤其是timeWindow,它是 aSlidingWindow还是TumblingWindow?

flink-streaming

2
推荐指数
1
解决办法
421
查看次数

处理时间窗口不适用于 Apache Flink 中的有限数据源

我正在尝试将一个非常简单的窗口函数应用于 Apache Flink 中的有限数据流(本地,无集群)。这是示例:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))

  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create)
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.sorted.toString())
    }
  })

  .print()

env.execute()
Run Code Online (Sandbox Code Playgroud)

在这里,我尝试将在一秒钟内到达窗口的所有元素分组,然后只打印这些组。

我假设所有元素都将在不到一秒的时间内生成并进入一个窗口,因此print(). 但是,当我运行它时,根本没有打印任何内容

如果我删除所有窗口的东西,比如

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .print()
Run Code Online (Sandbox Code Playgroud)

我看到运行后打印的元素。我也用文件源试过这个,没有区别。

我机器上的默认并行度是 6。如果我试验并行度和延迟的级别,像这样

val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .map { x => Thread.sleep(1500); x }
Run Code Online (Sandbox Code Playgroud)

我能够将一些——不是全部——元素分组,然后打印出来。

我的第一个假设是源的完成速度远快于 …

apache-flink flink-streaming

2
推荐指数
1
解决办法
913
查看次数

Flink 连接被拒绝:localhost/127.0.0.1:8081

我收到以下错误

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:连接被拒绝:localhost/127.0.0.1:8081

在尝试使用 Flink 流式传输数据时。大约一个月前,我的代码运行良好,现在出现此错误。之前我还能够通过在http://localhost:8081上访问 Flink Web Dashboard 来监控流媒体进度,现在我的浏览器返回一个错误,它无法访问服务器“localhost”。可能是什么问题呢?先感谢您。

错误消息的完整输出:

org.apache.flink.client.program.ProgramInvocationException:无法检索执行结果。(JobID: cfa44a8fd6a62f51ef8c0f956d55ee56) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java): org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at examples.WaterLevelKafka.main(WaterLevelKafka.java:124) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun。 reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink 。客户。UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java: 1120) 由​​以下原因引起:org.apache.flink.runtime.client.JobSubmissionException:无法提交 JobGraph。在 org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:379) 在 java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) 在 java.util.concurrent。 CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:19.apache7) at org .flink。flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) 在 org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java) :459) 在 org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) 在 org.apache.flink.shaded.netty4.io.netty.util。 concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.lang.Thread.run(Thread.java:745) 引起:java.util.concurrent.CompletionException:org.apache.flink.runtime.concurrent.FutureUtils$ RetryException: 无法完成操作。重试次数已用完。在 java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) 在 java.util.concurrent.CompletableFuture。completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) ... 22 more作者:org.apache.flink.runtime.concurrent.FutureUtils$RetryException:无法完成操作。重试次数已用完。... 20 多个引起:java.util.concurrent.CompletionException:org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:连接被拒绝:localhost/127.0.0.1:8081 at …

apache-flink flink-streaming

2
推荐指数
1
解决办法
4593
查看次数

将 flink uid 命名为 operator 的最佳实践

是否有命名UIDfor 运算符的最佳实践?它可以是简单的东西吗

stream.flatMap(new FlatMapFunc).uid("1")
    .assignTimestampsAndWatermarks(new TimestampExtractor).uid("2")
    .keyBy(r => r.key )
    .timeWindow(Time.minutes(10))
    .allowedLateness(Time.minutes(30))
    .process(new ProcessFunc).uid("3")
Run Code Online (Sandbox Code Playgroud)

或者有一些命名 uid 的规则/建议?

apache-flink flink-streaming

2
推荐指数
1
解决办法
1326
查看次数

Flink 在 Kubernetes 和 Native Kubernetes 上的部署有何不同

b/w Native KubernetesKubernetes部署的主要区别是什么?

我是 Kubernetes 的新手,并试图了解它们上的 Flink 部署有何不同。如果对内部结构有任何见解,那将有很大帮助。

kubernetes apache-flink flink-streaming

2
推荐指数
1
解决办法
456
查看次数