标签: flink-streaming

Flink Sink 并行度 = 1?

我试图了解 Flink 中的并行性是如何工作的。该文档https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/programming-model.html似乎表明接收器的并行度等于1。就我而言,我是在我的接收器中写入 HBase - 这是否意味着只有一个任务(线程?)将写入 HBase?它没有为应用程序设置全局并行度吗?

apache-flink flink-streaming

1
推荐指数
1
解决办法
1773
查看次数

Apache Flink:即使给定聚合窗口没有输入记录到达,也会根据键控状态在 Flink 中发出输出记录

我正在尝试将 Apache Flink 用于 IoT 应用程序。我有一堆可能处于多种状态之一的设备。当设备更改状态时,它会发出一条消息,其中包含事件时间戳和更改后的状态。对于一台设备,这可能如下所示:

{Device_id:1,Event_Timestamp:9:01,状态:STATE_1}

{Device_id:1,Event_Timestamp:9:03,状态:STATE_2}

对于每个设备,我需要为给定的五分钟窗口内设备在每个状态下花费的时间量生成一个五分钟的聚合。为了做到这一点,我计划使用键控状态来存储每个设备的最后状态更新,以便我知道设备在聚合窗口开始时处于什么状态。例如,假设 ID 为“1”的设备有一个键控状态值,表示它在 8:58 进入“STATE_2”。那么 9:00 - 9:05 窗口的聚合输出将如下所示(基于上面的两个示例事件):

{Device_id:1,时间戳:9:00,状态:STATE_1,持续时间:120 秒}

{Device_id:1,时间戳:9:00,状态:STATE_2,持续时间:180 秒}

我的问题是这样的:如果窗口有事件,Flink 只会为给定的 device_id 打开一个窗口。这意味着,如果设备超过 5 分钟没有更改状态,则不会有任何记录进入流,因此窗口不会打开。但是,我需要发出一条记录,表明设备在整个五分钟内处于基于密钥状态中存储的内容的当前状态。例如,Flink 应该发出一条 9:05-9:10 的记录,表明 id 为“1”的设备在“STATE_2”中花费了全部 300 秒。

有没有办法输出每个设备在五分钟聚合窗口中处于给定状态的时间量的记录,即使状态在这五分钟内没有改变,因此设备不发送任何事件?如果没有,是否有任何解决方法可以用来获取应用程序所需的输出事件?

iot apache-flink flink-streaming

1
推荐指数
1
解决办法
568
查看次数

Flink 最小延迟的最佳配置

对于 Flink 流/Flink 有状态功能,已知较小setBufferTimeout的值(例如 5 毫秒)将提供“最佳”延迟体验。在优化 Flink 流或有状态函数作业中的延迟时,必须注意的其他推荐配置值(设置、重置、修改......)是什么?

apache-flink flink-streaming flink-statefun

1
推荐指数
1
解决办法
941
查看次数

我可以使用 Flink 的文件系统连接器作为查找表吗?

Yarn 上的 Flink 1.13.2 (Flink SQL)。

\n

有点困惑 - 我发现了两个(据我所知)不同规格的文件系统连接器(Ververica.com 与 ci.apache.org):

\n
    \n
  1. https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/overview/#supported-connectors \xe2\x80\x94 文件系统是“有界和无界扫描、查找

    \n
  2. \n
  3. https://docs.ververica.com/user_guide/sql_development/connectors.html#packaged-connectors \xe2\x80\x94 仅 JDBC 标记为可用于查找。

    \n
  4. \n
\n

我可以使用文件系统连接器 (csv) 创建查找(维度)表来丰富 Kafka 事件表吗?如果是的话——如何使用 Flink SQL?

\n

(我尝试过简单的左连接FOR SYSTEM_TIME AS OF a.event_datetime- 它在具有少量 Kafka 事件的测试环境中有效,但在生产中我收到GC overhead limit exceeded错误。我猜这是因为没有将小型 csv 表广播到工作节点。在 Spark 中,我曾经使用相关提示来解决这些类型问题。)

\n

apache-flink flink-streaming flink-sql

1
推荐指数
1
解决办法
485
查看次数

Flink KeyedCoProcessFunction 处理状态

我使用KeyedCoProcessFunction函数用来自另一个流的数据来丰富主数据流

代码:

class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {

  case class AssetStateDoc(assetId: Option[String])
  private var associatedDevices: ValueState[AssetStateDoc] = _

  override def open(parameters: Configuration): Unit = {
    val associatedDevicesDescriptor =
      new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
    associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
  }

  override def processElement1(
                                packet: PacketData,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    val tmpState = associatedDevices.value
    val state = if (tmpState == null) AssetStateDoc(None) else tmpState
    
    state.assetId match {
      case Some(assetId) =>
        logger.debug(s"There are state for …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

1
推荐指数
1
解决办法
1164
查看次数

Flink 表 api 文件系统连接器不可用

我正在尝试使用 flink table api (1.15.1 版本)写入本地文件系统。我正在使用 TableDescriptor.forConnector('filesystem') 但出现异常:无法找到在类路径中实现 DynamicTableFactory 的标识符“filesystem”的任何工厂。

谢谢:)

apache-flink flink-streaming flink-sql flink-batch flink-table-api

1
推荐指数
1
解决办法
588
查看次数

Apache Flink 是否放弃了 StreamOperatorTestHarness 类,或者它们是否转移到了不同​​的工件?

我正在使用 intellij、maven 3 和 flink 1.15.1 编写有状态流作业。我正在尝试为我的自定义 KeyedProcessFunction 编写单元测试,并尝试遵循此处的文档以及添加此处提到的依赖项。我对使用 KeyedOneInputStreamOperatorTestHarness 类感兴趣,但我在任何依赖项中都找不到该类(下面发布的经过编辑的 pom)。我唯一能找到该课程的方法是包含

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.5</version>
            <scope>test</scope>
            <type>test-jar</type>
        </dependency>
Run Code Online (Sandbox Code Playgroud)

这是来自以前的版本。Flink >= 1.15.0 现在不需要 scala 依赖库,因此 1.15.1 没有 flink-streaming-java_2.12。

我想要使​​用的类是否被移到其他地方或故意排除?我已经尝试了我所知道的所有 google foo,并且可以在 flink 存储库中找到该类,但在我迄今为止尝试过的任何 flink 依赖项中都找不到该类,除了 <=1.14.5 之外。我是否做错了什么或缺少一些文档?使用 1.14.5 库是我唯一的选择还是有一些我不知道的新型测试实用程序?

没有该类的 Pom 依赖项:

...
        <flink.version>1.15.1</flink.version>
...
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils-junit</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope> …
Run Code Online (Sandbox Code Playgroud)

maven apache-flink flink-streaming

1
推荐指数
1
解决办法
465
查看次数

Flink自定义触发器提供了意外的输出

我想创建一个Trigger第一次在20秒内触发,此后每5秒触发一次的触发。我已经习惯GlobalWindowsTrigger

val windowedStream = valueStream
                          .keyBy(0)
                          .window(GlobalWindows.create())
                          .trigger(TradeTrigger.of())
Run Code Online (Sandbox Code Playgroud)

这是中的代码TradeTrigger

@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {

    private static final long serialVersionUID = 1L;

    static boolean flag=false;
    static long ctime = System.currentTimeMillis();

    private TradeTrigger() {
    }

    @Override
    public TriggerResult onElement(
            Object arg0,
            long arg1,
            W arg2,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
            throws Exception {
        // TODO Auto-generated method stub

        if(flag == false){
            if((System.currentTimeMillis()-ctime) >= 20000){
               flag = true;
               ctime = System.currentTimeMillis();
               return TriggerResult.FIRE;
            } …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

0
推荐指数
1
解决办法
970
查看次数

Flink:在类路径中找不到适合“org.apache.flink.table.factories.DeserializationSchemaFactory”的表工厂

我用的是flink的table api,从kafka接收数据,然后注册成表,然后用sql语句处理,最后将结果转回流,写入目录,代码如下:

def main(args: Array[String]): Unit = { 

    val sEnv = StreamExecutionEnvironment.getExecutionEnvironment 
    sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

    val tEnv = TableEnvironment.getTableEnvironment(sEnv) 

   tEnv.connect( 
      new Kafka() 
        .version("0.11") 
        .topic("user") 
        .startFromEarliest() 
        .property("zookeeper.connect", "") 
        .property("bootstrap.servers", "") 
    ) 
      .withFormat( 
        new Json() 
          .failOnMissingField(false) 
          .deriveSchema()   //???? schema 
      ) 
      .withSchema( 
        new Schema() 
          .field("username_skey", Types.STRING) 
      ) 
      .inAppendMode() 
      .registerTableSource("user") 
     val userTest: Table = tEnv.sqlQuery( 
      """ 
       select ** form ** join **"".stripMargin) 
    val endStream = tEnv.toRetractStream[Row](userTest) 
    endStream.writeAsText("/tmp/sqlres",WriteMode.OVERWRITE) 
    sEnv.execute("Test_New_Sign_Student") 
 } 
Run Code Online (Sandbox Code Playgroud)

我本地测试成功,但是在集群中提交如下命令时,出现如下错误:

================================================== ======

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error. 
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) 
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) 
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426) 
        at …
Run Code Online (Sandbox Code Playgroud)

flink-streaming flink-sql

0
推荐指数
1
解决办法
6039
查看次数

KeyBy不会为不同的键创建不同的键流

我在读一个简单的JSON字符串作为输入和键控基于两个领域流AB。但KeyBy产生了对不同值的相同键合流B,但为的特定组合AB

输入:

{
    "A": "352580084349898",
    "B": "1546559127",
    "C": "A"
}
Run Code Online (Sandbox Code Playgroud)

这是我的Flink代码的核心逻辑:

DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
            .map(new MapFunction<String, GenericDataObject>() {
                @Override
                public GenericDataObject map(String s) throws Exception {
                    JSONObject jsonObject = new JSONObject(s);
                    GenericDataObject genericDataObject = new GenericDataObject();
                    genericDataObject.setA(jsonObject.getString("A"));
                    genericDataObject.setB(jsonObject.getString("B"));
                    genericDataObject.setC(jsonObject.getString("C"));
                    return genericDataObject;
                }
            });
DataStream<GenericDataObject> testStream = genericDataObjectDataStream
            .keyBy("A", "B")
            .map(new MapFunction<GenericDataObject, GenericDataObject>() {
                @Override
                public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {
                    return genericDataObject;
                }
            });
testStream.print();
Run Code Online (Sandbox Code Playgroud)

GenericDataObject是一个POJO,具有三个字段A, …

apache-flink flink-streaming

0
推荐指数
1
解决办法
329
查看次数