标签: flink-sql

Flink中的Lookup和Processing Time Temporal join有什么区别?

在我看来,Processing Time Temporal Join用于流和外部数据库,并且始终join根据连接条件获取外部数据库中的最新值。另外,Processing Time Temporal Jointhe external table is not feasible to materialize the table as a dynamic table within Flink.

类似地,Lookup Join用于流和外部数据库,并且始终是look up基于连接条件的外部数据库中的值。

Flink中会Lookup Join具体化外部数据库表吗?他们之间有什么区别?

apache-flink flink-streaming flink-sql

4
推荐指数
1
解决办法
981
查看次数

尽管使用间隔连接,但“行时间属性不得位于常规连接的输入行中”,但仅限于事件时间戳

示例代码:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment


env_settings = (
    EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table_env.execute_sql(
    """
    CREATE TABLE table1 (
        id INT,
        ts TIMESTAMP(3),
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
    ) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/home/alex/work/test-flink/data1.csv'
    )
"""
)


table_env.execute_sql(
    """
    CREATE TABLE table2 (
        id2 INT,
        ts2 TIMESTAMP(3),
        WATERMARK FOR ts2 AS ts2 - INTERVAL '5' SECOND
    ) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/home/alex/work/test-flink/data2.csv'
    ) …
Run Code Online (Sandbox Code Playgroud)

python join apache-flink flink-streaming flink-sql

3
推荐指数
1
解决办法
1361
查看次数

Apache Flink 延迟处理某些事件

我需要延迟处理某些事件。

例如。我有三个事件(发布在 Kafka 上):

  • A(id:1,重试时间:现在)
  • B(id:2,重试时间:10分钟后)
  • C(id:3,重试时间:现在)

我需要立即处理记录A和C,而记录B需要十分钟后处理。在 Apache Flink 中这是否可行?

到目前为止,无论我研究过什么,“触发器”似乎可能有助于在 Flink 中实现它,但尚未能够正确实现它。

我也查看了 Kafka 文档,但看起来不太可行。

apache-kafka apache-flink flink-streaming flink-cep flink-sql

3
推荐指数
1
解决办法
1957
查看次数

Apache Flink:如何使用Table API查询关系数据库?

以下代码段摘自该博客文章

val sensorTable = ??? // can be a CSV file, Kafka topic, database, or ...

// register the table source
tEnv.registerTableSource("sensors", sensorTable)
Run Code Online (Sandbox Code Playgroud)

我想从关系数据库中读取数据。Flink是否有TableSource用于JDBC数据库的?

apache-flink flink-streaming flink-sql

2
推荐指数
1
解决办法
1067
查看次数

Apache Flink中DataStream和Table API的区别

我是 Apache Flink 的新手,想了解 DataStream 和 Table API 之间的用例。请帮助我了解何时选择 Table API 而不是 DataStream API。

根据我的理解,可以使用 Table API 完成的事情也可以使用 DataStream API 完成。两种 API 有何不同?

apache-flink flink-streaming flink-sql

2
推荐指数
1
解决办法
1245
查看次数

Flink : Rowtime 属性不能在常规连接的输入行中

使用 flink SQL API,我想将多个表连接在一起并在时间窗口内进行一些计算。我有 3 个来自 CSV 文件的表,一个来自 Kafka。在 Kafka 表中,我有一个字段timestampMs,我想将其用于我的时间窗口操作。

为此,我做了以下代码:

reamExecutionEnvironment env = ... ;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

TableSource table1 = CsvTableSource.builder()
        .path("path/to/file1.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id1", Types.STRING)
        .field("someInfo1", Types.FLOAT)
        .build();

TableSource table2 = CsvTableSource.builder()
        .path("path/to/file2.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id2", Types.STRING)
        .field("someInfo2", Types.STRING)
        .build();

TableSource table3 = CsvTableSource.builder()
        .path("path/to/file3.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id2", Types.STRING)
        .field("id1", Types.STRING)
        .field("someInfo3", Types.FLOAT)
        .build();

tableEnv.registerTableSource("Table1",table1);
tableEnv.registerTableSource("Table2",table2);
tableEnv.registerTableSource("Table3",table3);


Schema schemaExt = new Schema().schema(SOME_SCHEMA);
schemaExt = schemaExt.field("rowtime", Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("timestampMs").watermarksPeriodicBounded(40000));

tableEnv.connect(new Kafka()
        .version("universal")
        .topic(MY_TOPIC)
        .properties(MY_PROPERTIES)
        .sinkPartitionerRoundRobin() …
Run Code Online (Sandbox Code Playgroud)

java apache-flink flink-sql

2
推荐指数
1
解决办法
1800
查看次数

Flink 中的事件时间窗口不触发

当我使用 flink 事件时间窗口时,该窗口不会触发。请问如何解决问题,有什么方法可以调试吗?

apache-flink flink-streaming flink-sql

1
推荐指数
1
解决办法
1635
查看次数

Flink 架构与表架构

我正在使用 Flink SQL API,我在所有“模式”类型之间有点迷失:TableSchemaSchema(来自org.apache.flink.table.descriptors.Schema)和TypeInformation

ATableSchema可以从 a 创建TypeInformation,aTypeInformation可以从 a 创建TableSchema,aSchema可以从 a 创建TableSchema

但看起来 aSchema无法转换回TypeInformationor TableSchema(?)

为什么有 3 种不同类型的对象来存储同一种信息?

例如,假设我有一个来自 Avro 架构文件的字符串架构,并且我想向其中添加一个新字段。为此,我找到的唯一解决方案是:

String mySchemaRaw = ...;
TypeInformation<Row> typeInfo = AvroSchemaConverter.convertToTypeInfo(mySchemaRaw);
Schema newSchema = new Schema().schema(TableSchema.fromTypeInfo(typeInfo));
newSchema = newSchema.field("nexField",...);


// Need the newSchema as a TableSchema 
Run Code Online (Sandbox Code Playgroud)

这是使用这些对象的正常方式吗?(我觉得很奇怪)

java apache-flink flink-sql

1
推荐指数
1
解决办法
1253
查看次数

Apache Flink 中的 KeyBy 与 GroupBy

Flink 中 KeyBy 和 GroupBy 的异同点是什么?如果在 Table only 程序中使用 Table/SQL API,GroupBy 是否等同于 KeyBy?

apache-flink flink-streaming flink-sql

1
推荐指数
1
解决办法
556
查看次数

我可以使用 Flink 的文件系统连接器作为查找表吗?

Yarn 上的 Flink 1.13.2 (Flink SQL)。

\n

有点困惑 - 我发现了两个(据我所知)不同规格的文件系统连接器(Ververica.com 与 ci.apache.org):

\n
    \n
  1. https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/overview/#supported-connectors \xe2\x80\x94 文件系统是“有界和无界扫描、查找

    \n
  2. \n
  3. https://docs.ververica.com/user_guide/sql_development/connectors.html#packaged-connectors \xe2\x80\x94 仅 JDBC 标记为可用于查找。

    \n
  4. \n
\n

我可以使用文件系统连接器 (csv) 创建查找(维度)表来丰富 Kafka 事件表吗?如果是的话——如何使用 Flink SQL?

\n

(我尝试过简单的左连接FOR SYSTEM_TIME AS OF a.event_datetime- 它在具有少量 Kafka 事件的测试环境中有效,但在生产中我收到GC overhead limit exceeded错误。我猜这是因为没有将小型 csv 表广播到工作节点。在 Spark 中,我曾经使用相关提示来解决这些类型问题。)

\n

apache-flink flink-streaming flink-sql

1
推荐指数
1
解决办法
485
查看次数

Flink 表 api 文件系统连接器不可用

我正在尝试使用 flink table api (1.15.1 版本)写入本地文件系统。我正在使用 TableDescriptor.forConnector('filesystem') 但出现异常:无法找到在类路径中实现 DynamicTableFactory 的标识符“filesystem”的任何工厂。

谢谢:)

apache-flink flink-streaming flink-sql flink-batch flink-table-api

1
推荐指数
1
解决办法
588
查看次数

Flink:在类路径中找不到适合“org.apache.flink.table.factories.DeserializationSchemaFactory”的表工厂

我用的是flink的table api,从kafka接收数据,然后注册成表,然后用sql语句处理,最后将结果转回流,写入目录,代码如下:

def main(args: Array[String]): Unit = { 

    val sEnv = StreamExecutionEnvironment.getExecutionEnvironment 
    sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

    val tEnv = TableEnvironment.getTableEnvironment(sEnv) 

   tEnv.connect( 
      new Kafka() 
        .version("0.11") 
        .topic("user") 
        .startFromEarliest() 
        .property("zookeeper.connect", "") 
        .property("bootstrap.servers", "") 
    ) 
      .withFormat( 
        new Json() 
          .failOnMissingField(false) 
          .deriveSchema()   //???? schema 
      ) 
      .withSchema( 
        new Schema() 
          .field("username_skey", Types.STRING) 
      ) 
      .inAppendMode() 
      .registerTableSource("user") 
     val userTest: Table = tEnv.sqlQuery( 
      """ 
       select ** form ** join **"".stripMargin) 
    val endStream = tEnv.toRetractStream[Row](userTest) 
    endStream.writeAsText("/tmp/sqlres",WriteMode.OVERWRITE) 
    sEnv.execute("Test_New_Sign_Student") 
 } 
Run Code Online (Sandbox Code Playgroud)

我本地测试成功,但是在集群中提交如下命令时,出现如下错误:

================================================== ======

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error. 
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) 
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) 
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426) 
        at …
Run Code Online (Sandbox Code Playgroud)

flink-streaming flink-sql

0
推荐指数
1
解决办法
6039
查看次数

如何高效地将flink pipeline中的数据写入redis

我正在 Apache flink sql api 中构建管道。该管道执行简单的投影查询。但是,我需要在查询之前编写一次元组(确切地说是每个元组中的一些元素),在查询之后编写一次。事实证明,我用来写入 Redis 的代码严重降低了性能。即flink在很小的数据速率下做出反压。我的代码有什么问题以及如何改进。有什么建议请。

当我停止写入redis之前和之后,性能都非常好。这是我的管道代码:

public class QueryExample {
    public static Long throughputCounterAfter=new Long("0");
    public static void main(String[] args) {
        int k_partitions = 10;
        reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(5 * 32);
        Properties props = new Properties();
        props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
        props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
        // not to be shared with another job consuming the same topic
        props.setProperty("group.id", "flink-group");
        props.setProperty("enable.auto.commit","false");
        FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
                new SimpleStringSchema(),
                props);

        DataStream<String> purchasesStream = env
                .addSource(purchasesConsumer)
                .setParallelism(Math.min(5 * 32, k_partitions));
        DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks …
Run Code Online (Sandbox Code Playgroud)

redis backpressure apache-flink flink-streaming flink-sql

0
推荐指数
1
解决办法
2314
查看次数