我研究了 Apache Flink 的新 Stateful Functions 2.0 API。我阅读了以下文档链接https://ci.apache.org/projects/flink/flink-statefun-docs-stable/。我还在 Git 存储库中运行了示例。( https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples )\n我对实现有几个问题。
\n\nhttps://flink.apache.org/stateful-functions.html --> 页面末尾有一个示例,用于欺诈检测的交易评分。
\n\n第一个问题是关于状态 TTL 的。我如何向 TTL 提供状态?示例表示:30 天后,\xe2\x80\x9cFraud Count\xe2\x80\x9d 函数将收到一条过期消息(来自其自身)并清除其状态。我应该阅读本手册还是还有其他功能?我该如何做这本手册?
\n\n关于keyedstream的第二个问题。示例表示: \xe2\x80\x9cFraud Count\xe2\x80\x9d 的多个实例将存在 \xe2\x80\x94 例如,每个客户帐户一个。我应该给 赋值PersistedTable<K,V>吗?例如<customerid,count>。我可以清除特定键的状态吗?
最后一个问题是关于窗口和水印的。如何将这些功能实现到 Stateful Functions 2.0?
\nstate stream-processing apache-flink flink-streaming flink-statefun
我有以下 Flink 作业,我尝试使用后端类型 RockDB 的键控流状态函数(MapState),
environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")
Run Code Online (Sandbox Code Playgroud)
MyRichMapFunction 是一个有状态函数,它扩展了 RichMapFunction,它具有以下代码,
public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
private transient MapState<String, Boolean> cache;
@Override
public void open(Configuration config) {
MapStateDescriptor<String, Boolean> descriptor =
new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
cache = getRuntimeContext().getMapState(descriptor);
}
@Override
public MyEvent map(MyEvent value) throws Exception {
if (cache.contains(value.getEventId())) {
value.setIsSeenAlready(Boolean.TRUE);
return value;
}
value.setIsSeenAlready(Boolean.FALSE);
cache.put(value.getEventId(), Boolean.TRUE)
return value;
}
}
Run Code Online (Sandbox Code Playgroud)
将来,我想重新调整并行度(从 2 到 4),所以我的问题是,如何实现可重新扩展的键控状态,以便在更改并行度后我可以将相应的缓存键控数据获取到其相应的任务槽。我试图探索这一点,我在这里找到了一个文档。据此,可以通过使用ListCheckPointed接口来实现可重新扩展的操作符状态,该接口为此提供了snapshotState/restoreState方法。但不确定如何实现可重新扩展的键控状态(MyRichMapFunction)?我是否需要为 MyRichMapFunction …
最近我用 Docker-compose 启动了一个 flink 集群。这是我的文件:
version: "2.1"
services:
jobmanager:
image: flink:1.9.2-scala_2.11
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.9.2-scala_2.11
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
Run Code Online (Sandbox Code Playgroud)
我从“docker-compose up -d --scale taskmanager=3”开始分配 3 个任务管理器。我已经通过了 WordCount 演示,当我在 bash 中输入一些单词时,没有发生错误。
在 Flink 1.11 DataStream API 页面上,有一个使用keyBy()的WindowWordCount程序,但是,此方法已被弃用,我找不到任何关于如何在不使用keyBy()的情况下重写它的示例。任何建议将不胜感激。
我使用Intellij;它警告 keyBy() 已被弃用。这是我能找到的唯一链接。
Apache Flink 通过从检查点恢复作业来保证故障时的一次性处理和恢复,检查点是分布式数据流和算子状态的一致快照(分布式快照的 Chandy-Lamport 算法)。 这保证了故障转移时恰好一次。
在集群正常运行的情况下,Flink 如何保证一次处理,例如给定一个从外部源(比如 Kafka)读取的 Flink 源,Flink 如何保证事件从源读取一次?事件源和 Flink 源之间是否存在任何类型的应用程序级别确认?另外,Flink 如何保证事件从上游算子到下游算子只传播一次?这是否也需要对收到的事件进行任何类型的确认?
当我需要使用 I/O(查询 DB、调用第三个 API,...)时,我可以使用 RichAsyncFunction。但我需要通过 GG Sheet API 与 Google Sheet 进行交互: https: //developers.google.com/sheets/api/quickstart/java。这个API是同步的。我写了下面的代码片段:
public class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {
@Override
public void asyncInvoke(Obj message, final ResultFuture<String> resultFuture) {
CompletableFuture.supplyAsync(() -> {
syncSendToGGSheet(message);
return "";
}).thenAccept((String result) -> {
resultFuture.complete(Collections.singleton(result));
});
}
}
Run Code Online (Sandbox Code Playgroud)
但我发现消息发送到GGSheet非常慢,看起来是同步发送的。
示例代码:
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = (
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.execute_sql(
"""
CREATE TABLE table1 (
id INT,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '/home/alex/work/test-flink/data1.csv'
)
"""
)
table_env.execute_sql(
"""
CREATE TABLE table2 (
id2 INT,
ts2 TIMESTAMP(3),
WATERMARK FOR ts2 AS ts2 - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '/home/alex/work/test-flink/data2.csv'
) …Run Code Online (Sandbox Code Playgroud) 我的工作有状态操作员并且还启用了检查点。staful 操作员的任务之一由于某种原因失败,已重新启动并恢复检查点状态。
我想问以下哪一个是重启行为:
我需要延迟处理某些事件。
例如。我有三个事件(发布在 Kafka 上):
我需要立即处理记录A和C,而记录B需要十分钟后处理。在 Apache Flink 中这是否可行?
到目前为止,无论我研究过什么,“触发器”似乎可能有助于在 Flink 中实现它,但尚未能够正确实现它。
我也查看了 Kafka 文档,但看起来不太可行。
apache-kafka apache-flink flink-streaming flink-cep flink-sql
我正在按照Flink 官方教程在本机 Kubernetes 中启动会话。
首先,我创建了一个干净的新集群。
然而,运行后
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
Run Code Online (Sandbox Code Playgroud)
my-first-flink-cluster-xxx我在刚刚创建的pod 日志中收到错误:
2021-08-14 18:33:02,519 WARN io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Exec Failure: HTTP 403, Status: 403 - pods is forbidden: User "system:serviceaccount:default:default" cannot watch resource "pods" in API group "" in the namespace "default"
java.net.ProtocolException: Expected HTTP 101 response but was '403 Forbidden'
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.checkResponse(RealWebSocket.java:229) [flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:196) [flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.12-1.13.1.jar:1.13.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_302]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_302]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
2021-08-14 18:33:02,585 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesPodsWatcher [] - The …Run Code Online (Sandbox Code Playgroud) apache-flink ×10
apache-kafka ×2
flink-sql ×2
docker ×1
exactly-once ×1
flink-cep ×1
join ×1
kubernetes ×1
python ×1
state ×1