我是 Apache Flink 的新手,想了解 DataStream 和 Table API 之间的用例。请帮助我了解何时选择 Table API 而不是 DataStream API。
根据我的理解,可以使用 Table API 完成的事情也可以使用 DataStream API 完成。两种 API 有何不同?
我按照Reporter的说明将 flink 指标导出到普罗米修斯,但似乎默认情况下它只将与作业管理器相关的指标导出到普罗米修斯,见下文:

打开http://localhost:9249/,我只得到以下信息,没有找到与任务或任务管理器相关的指标。
# HELP flink_jobmanager_Status_JVM_Memory_Mapped_MemoryUsed MemoryUsed (scope: jobmanager_Status_JVM_Memory_Mapped)
# TYPE flink_jobmanager_Status_JVM_Memory_Mapped_MemoryUsed gauge
flink_jobmanager_Status_JVM_Memory_Mapped_MemoryUsed{host="localhost",} 0.0
# HELP flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded ClassesUnloaded (scope: jobmanager_Status_JVM_ClassLoader)
# TYPE flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded gauge
flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded{host="localhost",} 0.0
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Time Time (scope: jobmanager_Status_JVM_GarbageCollector_PS_Scavenge)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Time gauge
flink_jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Time{host="localhost",} 273.0
# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="dfac65e575f318970e0225eab9688a2e",host="localhost",job_name="Popular_Places_to_Elasticsearch",} -1.0
# HELP flink_jobmanager_job_lastCheckpointAlignmentBuffered lastCheckpointAlignmentBuffered (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointAlignmentBuffered gauge
flink_jobmanager_job_lastCheckpointAlignmentBuffered{job_id="dfac65e575f318970e0225eab9688a2e",host="localhost",job_name="Popular_Places_to_Elasticsearch",} 0.0
# HELP flink_jobmanager_job_lastCheckpointExternalPath lastCheckpointExternalPath (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointExternalPath gauge
flink_jobmanager_job_lastCheckpointExternalPath{job_id="dfac65e575f318970e0225eab9688a2e",host="localhost",job_name="Popular_Places_to_Elasticsearch",} 0.0 …Run Code Online (Sandbox Code Playgroud) 我正在尝试将一个非常简单的窗口函数应用于 Apache Flink 中的有限数据流(本地,无集群)。这是示例:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(ProcessingTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.sorted.toString())
}
})
.print()
env.execute()
Run Code Online (Sandbox Code Playgroud)
在这里,我尝试将在一秒钟内到达窗口的所有元素分组,然后只打印这些组。
我假设所有元素都将在不到一秒的时间内生成并进入一个窗口,因此print(). 但是,当我运行它时,根本没有打印任何内容。
如果我删除所有窗口的东西,比如
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.print()
Run Code Online (Sandbox Code Playgroud)
我看到运行后打印的元素。我也用文件源试过这个,没有区别。
我机器上的默认并行度是 6。如果我试验并行度和延迟的级别,像这样
val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
.fromCollection(List("a", "b", "c", "d", "e"))
.map { x => Thread.sleep(1500); x }
Run Code Online (Sandbox Code Playgroud)
我能够将一些——不是全部——元素分组,然后打印出来。
我的第一个假设是源的完成速度远快于 …
我收到以下错误
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:连接被拒绝:localhost/127.0.0.1:8081
在尝试使用 Flink 流式传输数据时。大约一个月前,我的代码运行良好,现在出现此错误。之前我还能够通过在http://localhost:8081上访问 Flink Web Dashboard 来监控流媒体进度,现在我的浏览器返回一个错误,它无法访问服务器“localhost”。可能是什么问题呢?先感谢您。
错误消息的完整输出:
org.apache.flink.client.program.ProgramInvocationException:无法检索执行结果。(JobID: cfa44a8fd6a62f51ef8c0f956d55ee56) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java): org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at examples.WaterLevelKafka.main(WaterLevelKafka.java:124) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun。 reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink 。客户。UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java: 1120) 由以下原因引起:org.apache.flink.runtime.client.JobSubmissionException:无法提交 JobGraph。在 org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:379) 在 java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) 在 java.util.concurrent。 CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:19.apache7) at org .flink。flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) 在 org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java) :459) 在 org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) 在 org.apache.flink.shaded.netty4.io.netty.util。 concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.lang.Thread.run(Thread.java:745) 引起:java.util.concurrent.CompletionException:org.apache.flink.runtime.concurrent.FutureUtils$ RetryException: 无法完成操作。重试次数已用完。在 java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) 在 java.util.concurrent.CompletableFuture。completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) ... 22 more作者:org.apache.flink.runtime.concurrent.FutureUtils$RetryException:无法完成操作。重试次数已用完。... 20 多个引起:java.util.concurrent.CompletionException:org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:连接被拒绝:localhost/127.0.0.1:8081 at …
Added Depedency Pom Details :
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop</artifactId>
<version>1.7.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency> …Run Code Online (Sandbox Code Playgroud) 是否有命名UIDfor 运算符的最佳实践?它可以是简单的东西吗
stream.flatMap(new FlatMapFunc).uid("1")
.assignTimestampsAndWatermarks(new TimestampExtractor).uid("2")
.keyBy(r => r.key )
.timeWindow(Time.minutes(10))
.allowedLateness(Time.minutes(30))
.process(new ProcessFunc).uid("3")
Run Code Online (Sandbox Code Playgroud)
或者有一些命名 uid 的规则/建议?
使用 flink SQL API,我想将多个表连接在一起并在时间窗口内进行一些计算。我有 3 个来自 CSV 文件的表,一个来自 Kafka。在 Kafka 表中,我有一个字段timestampMs,我想将其用于我的时间窗口操作。
为此,我做了以下代码:
reamExecutionEnvironment env = ... ;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableSource table1 = CsvTableSource.builder()
.path("path/to/file1.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id1", Types.STRING)
.field("someInfo1", Types.FLOAT)
.build();
TableSource table2 = CsvTableSource.builder()
.path("path/to/file2.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id2", Types.STRING)
.field("someInfo2", Types.STRING)
.build();
TableSource table3 = CsvTableSource.builder()
.path("path/to/file3.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id2", Types.STRING)
.field("id1", Types.STRING)
.field("someInfo3", Types.FLOAT)
.build();
tableEnv.registerTableSource("Table1",table1);
tableEnv.registerTableSource("Table2",table2);
tableEnv.registerTableSource("Table3",table3);
Schema schemaExt = new Schema().schema(SOME_SCHEMA);
schemaExt = schemaExt.field("rowtime", Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("timestampMs").watermarksPeriodicBounded(40000));
tableEnv.connect(new Kafka()
.version("universal")
.topic(MY_TOPIC)
.properties(MY_PROPERTIES)
.sinkPartitionerRoundRobin() …Run Code Online (Sandbox Code Playgroud) 我有一个 Flink 程序,它接收两个流,即数据/传感器读数流和警报规则流。我正在广播规则流并将其连接到数据流以生成动态警报。ProcessingTime 一切正常,但 EventTime 没有任何效果。我已经为我的数据流分配了时间戳和水印,并按原样传递了规则流(因为规则流只有在看到新的附加规则/更新时才会有记录)。但不会生成警报。
当两个流,即一个带有时间戳和水印的流和一个只有规则(广播)的流被连接并根据规则动态处理时,如何使用“EventTime”来生成警报。
我是否一定需要为我的规则流分配时间戳和水印?
因为我的规则流只有在有任何添加/修改时才会有记录。是否有任何解决方法或技巧可以避免/克服这种情况?
任何帮助/建议将不胜感激。
——我试过了!我尝试使用一个流,即数据流,通过使用硬编码窗口规则生成警报。它工作正常。但是当我将它与规则流连接时,它无法生成任何警报/输出。
使用“ProcessingTime”一切正常,但使用“EventTime”则不然。
——我所期待的!我希望我的程序能够工作,当我将连续数据流与非连续规则流连接时,使用“EventTime”生成动态警报。
我已经阅读了 Flink 关于状态后端的官方文档,这里。特别是,我对RocksDBStateBackend很感兴趣。
我不明白,如果我启用这种后端,RocksDB 是否可以通过 Flink 集群内的另一个节点从TaskManagers访问?
到目前为止,我对 RocksDBStateBackend 的理解是任务管理器将状态存储在它们的内存中,即 JVM 进程的内存中。之后,他们会将状态发送到存储在 RocksDB 中吗?如果是,Flink 集群中的 RocksDB 在哪里?物理上在哪里?
我的 Flink 管道目前使用包含一些列表和映射(字符串)的 Pojo,沿着
public class MyPojo {
private List<String> myList = new ArrayList<>();
private OtherPojo otherPojo = new OtherPojo();
// getters + setters...
}
public class OtherPojo {
private Map<String, String> myMap = new HashMap<>();
// getters + setters...
}
Run Code Online (Sandbox Code Playgroud)
出于性能原因,我想绕过 Kryo 序列化,所以我禁用了通用回退,env.getConfig().disableGenericTypes();如Flink 文档中所述。
现在,Flink 抱怨列表:
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
at …Run Code Online (Sandbox Code Playgroud)