我想知道为什么我确实需要创建自己的 RichSinkFunction 或使用 JDBCOutputFormat 连接数据库,而不是仅仅使用 SinkFunction 中的传统 PostgreSQL 驱动程序创建连接、执行查询并关闭连接?
我发现很多文章都说这样做,但没有解释为什么?有什么不同?
使用 JDBCOutputFormat 的代码示例,
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://localhost:1234/test?user=xxx&password=xxx")
.setQuery(query)
.setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }) //set the types
.finish();
Run Code Online (Sandbox Code Playgroud)
实现自己的 RichSinkFunction 的代码示例,
public class RichCaseSink extends RichSinkFunction<Case> {
private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
+ "VALUES (?, ?) "
+ "ON CONFLICT (caseid) DO UPDATE SET "
+ " tracehash=?";
private PreparedStatement statement;
@Override
public void invoke(Case aCase) throws Exception {
statement.setString(1, …Run Code Online (Sandbox Code Playgroud) 我在一个 java 类中定义某些变量,并使用不同的类访问它,以便过滤流中的唯一元素。请参考代码以更好地理解该问题。
我面临的问题是这个过滤器功能不能很好地工作并且无法过滤独特的事件。我怀疑该变量在不同线程之间共享,这就是原因!?如果这不是正确的方法,请建议另一种方法。提前致谢。
**ClassWithVariables.java**
public static HashMap<String, ArrayList<String>> uniqueMap = new HashMap<>();
**FilterClass.java**
public boolean filter(String val) throws Exception {
if(ClassWithVariables.uniqueMap.containsKey(key)) {
Arraylist<String> al = uniqueMap.get(key);
if(al.contains(val) {
return false;
} else {
//Update the hashmap list(uniqueMap)
return true;
}
} else {
//Add to hashmap list(uniqueMap)
return true;
}
}
Run Code Online (Sandbox Code Playgroud) Akka 程序更难调试。我们还需要跟踪许多参与者的状态来诊断问题。有什么解释为什么 Flink 选择 Akka 吗?
sinkfunction我正在尝试理解和的概念richsinkfunction。然而,我找不到一个详细解释其底层概念的网站,但我找到的是它的 API 文档。
谁能给我简单解释一下flink中的两个函数sinkfunctionand ?richsinkfunction
我试图了解 Flink 中的并行性是如何工作的。该文档https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/programming-model.html似乎表明接收器的并行度等于1。就我而言,我是在我的接收器中写入 HBase - 这是否意味着只有一个任务(线程?)将写入 HBase?它没有为应用程序设置全局并行度吗?
我正在使用 Flink SQL API,我在所有“模式”类型之间有点迷失:TableSchema、Schema(来自org.apache.flink.table.descriptors.Schema)和TypeInformation。
ATableSchema可以从 a 创建TypeInformation,aTypeInformation可以从 a 创建TableSchema,aSchema可以从 a 创建TableSchema
但看起来 aSchema无法转换回TypeInformationor TableSchema(?)
为什么有 3 种不同类型的对象来存储同一种信息?
例如,假设我有一个来自 Avro 架构文件的字符串架构,并且我想向其中添加一个新字段。为此,我找到的唯一解决方案是:
String mySchemaRaw = ...;
TypeInformation<Row> typeInfo = AvroSchemaConverter.convertToTypeInfo(mySchemaRaw);
Schema newSchema = new Schema().schema(TableSchema.fromTypeInfo(typeInfo));
newSchema = newSchema.field("nexField",...);
// Need the newSchema as a TableSchema
Run Code Online (Sandbox Code Playgroud)
这是使用这些对象的正常方式吗?(我觉得很奇怪)
我正在我的 Flink 应用程序中读取来自 kinesis 的事件。事件采用 protobuf 格式。如果我'com.google.protobuf:protobuf-java:3.7.1'在 flink 应用程序中使用 with ,我就没有问题。但是,如果我将其更改为'com.google.protobuf:protobuf-java:3.10.0'我会得到上述堆栈跟踪异常
java.lang.IncompatibleClassChangeError: class com.google.protobuf.Descriptors$OneofDescriptor has interface com.google.protobuf.Descriptors$GenericDescriptor as super class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetPublicMethods(Class.java:2902)
at java.lang.Class.privateGetPublicMethods(Class.java:2917)
at java.lang.Class.getMethods(Class.java:1615)
at org.apache.flink.api.java.typeutils.TypeExtractor.isValidPojoField(TypeExtractor.java:1786)
at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1856)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1746)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1643)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:921)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:781)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:735)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:731)
at org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:211)
at org.apache.flink.api.java.typeutils.ListTypeInfo.<init>(ListTypeInfo.java:45)
at …Run Code Online (Sandbox Code Playgroud) 我正在尝试将 Apache Flink 用于 IoT 应用程序。我有一堆可能处于多种状态之一的设备。当设备更改状态时,它会发出一条消息,其中包含事件时间戳和更改后的状态。对于一台设备,这可能如下所示:
{Device_id:1,Event_Timestamp:9:01,状态:STATE_1}
{Device_id:1,Event_Timestamp:9:03,状态:STATE_2}
对于每个设备,我需要为给定的五分钟窗口内设备在每个状态下花费的时间量生成一个五分钟的聚合。为了做到这一点,我计划使用键控状态来存储每个设备的最后状态更新,以便我知道设备在聚合窗口开始时处于什么状态。例如,假设 ID 为“1”的设备有一个键控状态值,表示它在 8:58 进入“STATE_2”。那么 9:00 - 9:05 窗口的聚合输出将如下所示(基于上面的两个示例事件):
{Device_id:1,时间戳:9:00,状态:STATE_1,持续时间:120 秒}
{Device_id:1,时间戳:9:00,状态:STATE_2,持续时间:180 秒}
我的问题是这样的:如果窗口有事件,Flink 只会为给定的 device_id 打开一个窗口。这意味着,如果设备超过 5 分钟没有更改状态,则不会有任何记录进入流,因此窗口不会打开。但是,我需要发出一条记录,表明设备在整个五分钟内处于基于密钥状态中存储的内容的当前状态。例如,Flink 应该发出一条 9:05-9:10 的记录,表明 id 为“1”的设备在“STATE_2”中花费了全部 300 秒。
有没有办法输出每个设备在五分钟聚合窗口中处于给定状态的时间量的记录,即使状态在这五分钟内没有改变,因此设备不发送任何事件?如果没有,是否有任何解决方法可以用来获取应用程序所需的输出事件?
对于 Flink 流/Flink 有状态功能,已知较小setBufferTimeout的值(例如 5 毫秒)将提供“最佳”延迟体验。在优化 Flink 流或有状态函数作业中的延迟时,必须注意的其他推荐配置值(设置、重置、修改......)是什么?
我有一个关于将数据放入 Kafka 主题的流作业,还有另一个使用 kafka 主题中的数据的流作业。我的Kafka集群的分区号的配置是3。
当我将作业的并行度设置为 时4,只有 3 个插槽忙于生成数据,并且只有 3 个消费者子任务获取数据。
考虑到任务槽数量的限制,我想将并行度改为1. 但是,当我将作业的并行度设置为 时1,只1 consumer task slot获得了数据。
在我看来,即使我将并行度设置为1,我仍然将数据下沉到3个分区中,并且可以有3个消费者子任务消费数据。
为什么只有一个消费者子任务在这里工作?