在我看来,Processing Time Temporal Join用于流和外部数据库,并且始终join根据连接条件获取外部数据库中的最新值。另外,Processing Time Temporal Join当the external table is not feasible to materialize the table as a dynamic table within Flink.
类似地,Lookup Join用于流和外部数据库,并且始终是look up基于连接条件的外部数据库中的值。
Flink中会Lookup Join具体化外部数据库表吗?他们之间有什么区别?
示例代码:
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = (
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.execute_sql(
"""
CREATE TABLE table1 (
id INT,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '/home/alex/work/test-flink/data1.csv'
)
"""
)
table_env.execute_sql(
"""
CREATE TABLE table2 (
id2 INT,
ts2 TIMESTAMP(3),
WATERMARK FOR ts2 AS ts2 - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '/home/alex/work/test-flink/data2.csv'
) …Run Code Online (Sandbox Code Playgroud) 我需要延迟处理某些事件。
例如。我有三个事件(发布在 Kafka 上):
我需要立即处理记录A和C,而记录B需要十分钟后处理。在 Apache Flink 中这是否可行?
到目前为止,无论我研究过什么,“触发器”似乎可能有助于在 Flink 中实现它,但尚未能够正确实现它。
我也查看了 Kafka 文档,但看起来不太可行。
apache-kafka apache-flink flink-streaming flink-cep flink-sql
以下代码段摘自该博客文章:
val sensorTable = ??? // can be a CSV file, Kafka topic, database, or ...
// register the table source
tEnv.registerTableSource("sensors", sensorTable)
Run Code Online (Sandbox Code Playgroud)
我想从关系数据库中读取数据。Flink是否有TableSource用于JDBC数据库的?
我是 Apache Flink 的新手,想了解 DataStream 和 Table API 之间的用例。请帮助我了解何时选择 Table API 而不是 DataStream API。
根据我的理解,可以使用 Table API 完成的事情也可以使用 DataStream API 完成。两种 API 有何不同?
使用 flink SQL API,我想将多个表连接在一起并在时间窗口内进行一些计算。我有 3 个来自 CSV 文件的表,一个来自 Kafka。在 Kafka 表中,我有一个字段timestampMs,我想将其用于我的时间窗口操作。
为此,我做了以下代码:
reamExecutionEnvironment env = ... ;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableSource table1 = CsvTableSource.builder()
.path("path/to/file1.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id1", Types.STRING)
.field("someInfo1", Types.FLOAT)
.build();
TableSource table2 = CsvTableSource.builder()
.path("path/to/file2.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id2", Types.STRING)
.field("someInfo2", Types.STRING)
.build();
TableSource table3 = CsvTableSource.builder()
.path("path/to/file3.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id2", Types.STRING)
.field("id1", Types.STRING)
.field("someInfo3", Types.FLOAT)
.build();
tableEnv.registerTableSource("Table1",table1);
tableEnv.registerTableSource("Table2",table2);
tableEnv.registerTableSource("Table3",table3);
Schema schemaExt = new Schema().schema(SOME_SCHEMA);
schemaExt = schemaExt.field("rowtime", Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("timestampMs").watermarksPeriodicBounded(40000));
tableEnv.connect(new Kafka()
.version("universal")
.topic(MY_TOPIC)
.properties(MY_PROPERTIES)
.sinkPartitionerRoundRobin() …Run Code Online (Sandbox Code Playgroud) 当我使用 flink 事件时间窗口时,该窗口不会触发。请问如何解决问题,有什么方法可以调试吗?
我正在使用 Flink SQL API,我在所有“模式”类型之间有点迷失:TableSchema、Schema(来自org.apache.flink.table.descriptors.Schema)和TypeInformation。
ATableSchema可以从 a 创建TypeInformation,aTypeInformation可以从 a 创建TableSchema,aSchema可以从 a 创建TableSchema
但看起来 aSchema无法转换回TypeInformationor TableSchema(?)
为什么有 3 种不同类型的对象来存储同一种信息?
例如,假设我有一个来自 Avro 架构文件的字符串架构,并且我想向其中添加一个新字段。为此,我找到的唯一解决方案是:
String mySchemaRaw = ...;
TypeInformation<Row> typeInfo = AvroSchemaConverter.convertToTypeInfo(mySchemaRaw);
Schema newSchema = new Schema().schema(TableSchema.fromTypeInfo(typeInfo));
newSchema = newSchema.field("nexField",...);
// Need the newSchema as a TableSchema
Run Code Online (Sandbox Code Playgroud)
这是使用这些对象的正常方式吗?(我觉得很奇怪)
Flink 中 KeyBy 和 GroupBy 的异同点是什么?如果在 Table only 程序中使用 Table/SQL API,GroupBy 是否等同于 KeyBy?
Yarn 上的 Flink 1.13.2 (Flink SQL)。
\n有点困惑 - 我发现了两个(据我所知)不同规格的文件系统连接器(Ververica.com 与 ci.apache.org):
\nhttps://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/overview/#supported-connectors \xe2\x80\x94 文件系统是“有界和无界扫描、查找”
\nhttps://docs.ververica.com/user_guide/sql_development/connectors.html#packaged-connectors \xe2\x80\x94 仅 JDBC 标记为可用于查找。
\n我可以使用文件系统连接器 (csv) 创建查找(维度)表来丰富 Kafka 事件表吗?如果是的话——如何使用 Flink SQL?
\n(我尝试过简单的左连接FOR SYSTEM_TIME AS OF a.event_datetime- 它在具有少量 Kafka 事件的测试环境中有效,但在生产中我收到GC overhead limit exceeded错误。我猜这是因为没有将小型 csv 表广播到工作节点。在 Spark 中,我曾经使用相关提示来解决这些类型问题。)
我正在尝试使用 flink table api (1.15.1 版本)写入本地文件系统。我正在使用 TableDescriptor.forConnector('filesystem') 但出现异常:无法找到在类路径中实现 DynamicTableFactory 的标识符“filesystem”的任何工厂。
谢谢:)
apache-flink flink-streaming flink-sql flink-batch flink-table-api
我用的是flink的table api,从kafka接收数据,然后注册成表,然后用sql语句处理,最后将结果转回流,写入目录,代码如下:
def main(args: Array[String]): Unit = {
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(sEnv)
tEnv.connect(
new Kafka()
.version("0.11")
.topic("user")
.startFromEarliest()
.property("zookeeper.connect", "")
.property("bootstrap.servers", "")
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema() //???? schema
)
.withSchema(
new Schema()
.field("username_skey", Types.STRING)
)
.inAppendMode()
.registerTableSource("user")
val userTest: Table = tEnv.sqlQuery(
"""
select ** form ** join **"".stripMargin)
val endStream = tEnv.toRetractStream[Row](userTest)
endStream.writeAsText("/tmp/sqlres",WriteMode.OVERWRITE)
sEnv.execute("Test_New_Sign_Student")
}
Run Code Online (Sandbox Code Playgroud)
我本地测试成功,但是在集群中提交如下命令时,出现如下错误:
================================================== ======
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
at …Run Code Online (Sandbox Code Playgroud) 我正在 Apache flink sql api 中构建管道。该管道执行简单的投影查询。但是,我需要在查询之前编写一次元组(确切地说是每个元组中的一些元素),在查询之后编写一次。事实证明,我用来写入 Redis 的代码严重降低了性能。即flink在很小的数据速率下做出反压。我的代码有什么问题以及如何改进。有什么建议请。
当我停止写入redis之前和之后,性能都非常好。这是我的管道代码:
public class QueryExample {
public static Long throughputCounterAfter=new Long("0");
public static void main(String[] args) {
int k_partitions = 10;
reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(5 * 32);
Properties props = new Properties();
props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
// not to be shared with another job consuming the same topic
props.setProperty("group.id", "flink-group");
props.setProperty("enable.auto.commit","false");
FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
new SimpleStringSchema(),
props);
DataStream<String> purchasesStream = env
.addSource(purchasesConsumer)
.setParallelism(Math.min(5 * 32, k_partitions));
DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks …Run Code Online (Sandbox Code Playgroud) flink-sql ×13
apache-flink ×12
flink-streaming ×11
java ×2
apache-kafka ×1
backpressure ×1
flink-batch ×1
flink-cep ×1
join ×1
python ×1
redis ×1