我试图了解 Flink 中的并行性是如何工作的。该文档https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/programming-model.html似乎表明接收器的并行度等于1。就我而言,我是在我的接收器中写入 HBase - 这是否意味着只有一个任务(线程?)将写入 HBase?它没有为应用程序设置全局并行度吗?
我正在尝试将 Apache Flink 用于 IoT 应用程序。我有一堆可能处于多种状态之一的设备。当设备更改状态时,它会发出一条消息,其中包含事件时间戳和更改后的状态。对于一台设备,这可能如下所示:
{Device_id:1,Event_Timestamp:9:01,状态:STATE_1}
{Device_id:1,Event_Timestamp:9:03,状态:STATE_2}
对于每个设备,我需要为给定的五分钟窗口内设备在每个状态下花费的时间量生成一个五分钟的聚合。为了做到这一点,我计划使用键控状态来存储每个设备的最后状态更新,以便我知道设备在聚合窗口开始时处于什么状态。例如,假设 ID 为“1”的设备有一个键控状态值,表示它在 8:58 进入“STATE_2”。那么 9:00 - 9:05 窗口的聚合输出将如下所示(基于上面的两个示例事件):
{Device_id:1,时间戳:9:00,状态:STATE_1,持续时间:120 秒}
{Device_id:1,时间戳:9:00,状态:STATE_2,持续时间:180 秒}
我的问题是这样的:如果窗口有事件,Flink 只会为给定的 device_id 打开一个窗口。这意味着,如果设备超过 5 分钟没有更改状态,则不会有任何记录进入流,因此窗口不会打开。但是,我需要发出一条记录,表明设备在整个五分钟内处于基于密钥状态中存储的内容的当前状态。例如,Flink 应该发出一条 9:05-9:10 的记录,表明 id 为“1”的设备在“STATE_2”中花费了全部 300 秒。
有没有办法输出每个设备在五分钟聚合窗口中处于给定状态的时间量的记录,即使状态在这五分钟内没有改变,因此设备不发送任何事件?如果没有,是否有任何解决方法可以用来获取应用程序所需的输出事件?
对于 Flink 流/Flink 有状态功能,已知较小setBufferTimeout的值(例如 5 毫秒)将提供“最佳”延迟体验。在优化 Flink 流或有状态函数作业中的延迟时,必须注意的其他推荐配置值(设置、重置、修改......)是什么?
Yarn 上的 Flink 1.13.2 (Flink SQL)。
\n有点困惑 - 我发现了两个(据我所知)不同规格的文件系统连接器(Ververica.com 与 ci.apache.org):
\nhttps://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/overview/#supported-connectors \xe2\x80\x94 文件系统是“有界和无界扫描、查找”
\nhttps://docs.ververica.com/user_guide/sql_development/connectors.html#packaged-connectors \xe2\x80\x94 仅 JDBC 标记为可用于查找。
\n我可以使用文件系统连接器 (csv) 创建查找(维度)表来丰富 Kafka 事件表吗?如果是的话——如何使用 Flink SQL?
\n(我尝试过简单的左连接FOR SYSTEM_TIME AS OF a.event_datetime- 它在具有少量 Kafka 事件的测试环境中有效,但在生产中我收到GC overhead limit exceeded错误。我猜这是因为没有将小型 csv 表广播到工作节点。在 Spark 中,我曾经使用相关提示来解决这些类型问题。)
我使用KeyedCoProcessFunction函数用来自另一个流的数据来丰富主数据流
代码:
class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {
case class AssetStateDoc(assetId: Option[String])
private var associatedDevices: ValueState[AssetStateDoc] = _
override def open(parameters: Configuration): Unit = {
val associatedDevicesDescriptor =
new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
}
override def processElement1(
packet: PacketData,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {
val tmpState = associatedDevices.value
val state = if (tmpState == null) AssetStateDoc(None) else tmpState
state.assetId match {
case Some(assetId) =>
logger.debug(s"There are state for …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 flink table api (1.15.1 版本)写入本地文件系统。我正在使用 TableDescriptor.forConnector('filesystem') 但出现异常:无法找到在类路径中实现 DynamicTableFactory 的标识符“filesystem”的任何工厂。
谢谢:)
apache-flink flink-streaming flink-sql flink-batch flink-table-api
我正在使用 intellij、maven 3 和 flink 1.15.1 编写有状态流作业。我正在尝试为我的自定义 KeyedProcessFunction 编写单元测试,并尝试遵循此处的文档以及添加此处提到的依赖项。我对使用 KeyedOneInputStreamOperatorTestHarness 类感兴趣,但我在任何依赖项中都找不到该类(下面发布的经过编辑的 pom)。我唯一能找到该课程的方法是包含
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.5</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
Run Code Online (Sandbox Code Playgroud)
这是来自以前的版本。Flink >= 1.15.0 现在不需要 scala 依赖库,因此 1.15.1 没有 flink-streaming-java_2.12。
我想要使用的类是否被移到其他地方或故意排除?我已经尝试了我所知道的所有 google foo,并且可以在 flink 存储库中找到该类,但在我迄今为止尝试过的任何 flink 依赖项中都找不到该类,除了 <=1.14.5 之外。我是否做错了什么或缺少一些文档?使用 1.14.5 库是我唯一的选择还是有一些我不知道的新型测试实用程序?
没有该类的 Pom 依赖项:
...
<flink.version>1.15.1</flink.version>
...
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
<scope>test</scope> …Run Code Online (Sandbox Code Playgroud) 我想创建一个Trigger第一次在20秒内触发,此后每5秒触发一次的触发。我已经习惯GlobalWindows了Trigger
val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())
Run Code Online (Sandbox Code Playgroud)
这是中的代码TradeTrigger:
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
static boolean flag=false;
static long ctime = System.currentTimeMillis();
private TradeTrigger() {
}
@Override
public TriggerResult onElement(
Object arg0,
long arg1,
W arg2,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
throws Exception {
// TODO Auto-generated method stub
if(flag == false){
if((System.currentTimeMillis()-ctime) >= 20000){
flag = true;
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
} …Run Code Online (Sandbox Code Playgroud) 我用的是flink的table api,从kafka接收数据,然后注册成表,然后用sql语句处理,最后将结果转回流,写入目录,代码如下:
def main(args: Array[String]): Unit = {
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(sEnv)
tEnv.connect(
new Kafka()
.version("0.11")
.topic("user")
.startFromEarliest()
.property("zookeeper.connect", "")
.property("bootstrap.servers", "")
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema() //???? schema
)
.withSchema(
new Schema()
.field("username_skey", Types.STRING)
)
.inAppendMode()
.registerTableSource("user")
val userTest: Table = tEnv.sqlQuery(
"""
select ** form ** join **"".stripMargin)
val endStream = tEnv.toRetractStream[Row](userTest)
endStream.writeAsText("/tmp/sqlres",WriteMode.OVERWRITE)
sEnv.execute("Test_New_Sign_Student")
}
Run Code Online (Sandbox Code Playgroud)
我本地测试成功,但是在集群中提交如下命令时,出现如下错误:
================================================== ======
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
at …Run Code Online (Sandbox Code Playgroud) 我在读一个简单的JSON字符串作为输入和键控基于两个领域流A和B。但KeyBy产生了对不同值的相同键合流B,但为的特定组合A和B。
输入:
{
"A": "352580084349898",
"B": "1546559127",
"C": "A"
}
Run Code Online (Sandbox Code Playgroud)
这是我的Flink代码的核心逻辑:
DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
.map(new MapFunction<String, GenericDataObject>() {
@Override
public GenericDataObject map(String s) throws Exception {
JSONObject jsonObject = new JSONObject(s);
GenericDataObject genericDataObject = new GenericDataObject();
genericDataObject.setA(jsonObject.getString("A"));
genericDataObject.setB(jsonObject.getString("B"));
genericDataObject.setC(jsonObject.getString("C"));
return genericDataObject;
}
});
DataStream<GenericDataObject> testStream = genericDataObjectDataStream
.keyBy("A", "B")
.map(new MapFunction<GenericDataObject, GenericDataObject>() {
@Override
public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {
return genericDataObject;
}
});
testStream.print();
Run Code Online (Sandbox Code Playgroud)
GenericDataObject是一个POJO,具有三个字段A, …