标签: apache-flink

Apache Flink 中的有状态函数

我研究了 Apache Flink 的新 Stateful Functions 2.0 API。我阅读了以下文档链接https://ci.apache.org/projects/flink/flink-statefun-docs-stable/。我还在 Git 存储库中运行了示例。( https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples )\n我对实现有几个问题。

\n\n

https://flink.apache.org/stateful-functions.html --> 页面末尾有一个示例,用于欺诈检测的交易评分。

\n\n

第一个问题是关于状态 TTL 的。我如何向 TTL 提供状态?示例表示:30 天后,\xe2\x80\x9cFraud Count\xe2\x80\x9d 函数将收到一条过期消息(来自其自身)并清除其状态。我应该阅读本手册还是还有其他功能?我该如何做这本手册?

\n\n

关于keyedstream的第二个问题。示例表示: \xe2\x80\x9cFraud Count\xe2\x80\x9d 的多个实例将存在 \xe2\x80\x94 例如,每个客户帐户一个。我应该给 赋值PersistedTable<K,V>吗?例如<customerid,count>。我可以清除特定键的状态吗?

\n\n

最后一个问题是关于窗口和水印的。如何将这些功能实现到 Stateful Functions 2.0?

\n

state stream-processing apache-flink flink-streaming flink-statefun

3
推荐指数
1
解决办法
1978
查看次数

Flink 可重新扩展的键控流状态函数

我有以下 Flink 作业,我尝试使用后端类型 RockDB 的键控流状态函数(MapState),

environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")
Run Code Online (Sandbox Code Playgroud)

MyRichMapFunction 是一个有状态函数,它扩展了 RichMapFunction,它具有以下代码,

public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
    private transient MapState<String, Boolean> cache;
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Boolean> descriptor =
                new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
        cache = getRuntimeContext().getMapState(descriptor);
    }
    @Override
    public MyEvent map(MyEvent value) throws Exception {
        if (cache.contains(value.getEventId())) {
            value.setIsSeenAlready(Boolean.TRUE);
            return value;
        }
        value.setIsSeenAlready(Boolean.FALSE);
        cache.put(value.getEventId(), Boolean.TRUE)
        return value;
    }
}
Run Code Online (Sandbox Code Playgroud)

将来,我想重新调整并行度(从 2 到 4),所以我的问题是,如何实现可重新扩展的键控状态,以便在更改并行度后我可以将相应的缓存键控数据获取到其相应的任务槽。我试图探索这一点,我在这里找到了一个文档。据此,可以通过使用ListCheckPointed接口来实现可重新扩展的操作符状态,该接口为此提供了snapshotState/restoreState方法。但不确定如何实现可重新扩展的键控状态(MyRichMapFunction)?我是否需要为 MyRichMapFunction …

apache-flink flink-streaming

3
推荐指数
1
解决办法
572
查看次数

Flink WebUI 中没有 Logs 和 Stdout

最近我用 Docker-compose 启动了一个 flink 集群。这是我的文件:

version: "2.1"
services:
  jobmanager:
    image: flink:1.9.2-scala_2.11
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink:1.9.2-scala_2.11
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
Run Code Online (Sandbox Code Playgroud)

我从“docker-compose up -d --scale taskmanager=3”开始分配 3 个任务管理器。我已经通过了 WordCount 演示,当我在 bash 中输入一些单词时,没有发生错误。

但是,我无法从 WebUI 看到日志和标准输出 网页界面

docker apache-flink

3
推荐指数
1
解决办法
1865
查看次数

已弃用方法 DataStream.keyBy() 的替代方法

在 Flink 1.11 DataStream API 页面上,有一个使用keyBy()的WindowWordCount程序,但是,此方法已被弃用,我找不到任何关于如何在不使用keyBy()的情况下重写它的示例。任何建议将不胜感激。

我使用Intellij;它警告 keyBy() 已被弃用。这是我能找到的唯一链接

apache-flink

3
推荐指数
1
解决办法
1437
查看次数

Apache Flink 中的端到端 Exactly-once 处理

Apache Flink 通过从检查点恢复作业来保证故障时的一次性处理和恢复,检查点是分布式数据流和算子状态的一致快照(分布式快照的 Chandy-Lamport 算法)这保证了故障转移时恰好一次。

在集群正常运行的情况下,Flink 如何保证一次处理,例如给定一个从外部源(比如 Kafka)读取的 Flink 源,Flink 如何保证事件从源读取一次?事件源和 Flink 源之间是否存在任何类型的应用程序级别确认?另外,Flink 如何保证事件从上游算子到下游算子只传播一次?这是否也需要对收到的事件进行任何类型的确认?

apache-kafka apache-flink flink-streaming exactly-once

3
推荐指数
1
解决办法
2091
查看次数

我可以在 RichAsyncFunction 中编写同步代码吗

当我需要使用 I/O(查询 DB、调用第三个 API,...)时,我可以使用 RichAsyncFunction。但我需要通过 GG Sheet API 与 Google Sheet 进行交互: https: //developers.google.com/sheets/api/quickstart/java。这个API是同步的。我写了下面的代码片段:

public class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {

    @Override
    public void asyncInvoke(Obj message, final ResultFuture<String> resultFuture) {
        CompletableFuture.supplyAsync(() -> {
            syncSendToGGSheet(message);
            return "";
        }).thenAccept((String result) -> {
            resultFuture.complete(Collections.singleton(result));
        });
    }

}
Run Code Online (Sandbox Code Playgroud)

但我发现消息发送到GGSheet非常慢,看起来是同步发送的。

apache-flink flink-streaming

3
推荐指数
1
解决办法
1059
查看次数

尽管使用间隔连接,但“行时间属性不得位于常规连接的输入行中”,但仅限于事件时间戳

示例代码:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment


env_settings = (
    EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table_env.execute_sql(
    """
    CREATE TABLE table1 (
        id INT,
        ts TIMESTAMP(3),
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
    ) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/home/alex/work/test-flink/data1.csv'
    )
"""
)


table_env.execute_sql(
    """
    CREATE TABLE table2 (
        id2 INT,
        ts2 TIMESTAMP(3),
        WATERMARK FOR ts2 AS ts2 - INTERVAL '5' SECOND
    ) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/home/alex/work/test-flink/data2.csv'
    ) …
Run Code Online (Sandbox Code Playgroud)

python join apache-flink flink-streaming flink-sql

3
推荐指数
1
解决办法
1361
查看次数

如果某个任务失败,是否会重新启动整个作业

我的工作有状态操作员并且还启用了检查点。staful 操作员的任务之一由于某种原因失败,已重新启动并恢复检查点状态。

我想问以下哪一个是重启行为:

  1. 仅重新启动并恢复失败的任务
  2. 所有操作员(包含失败的任务)的任务都将重新启动并恢复
  3. 整个作业重新启动并恢复

apache-flink

3
推荐指数
1
解决办法
1485
查看次数

Apache Flink 延迟处理某些事件

我需要延迟处理某些事件。

例如。我有三个事件(发布在 Kafka 上):

  • A(id:1,重试时间:现在)
  • B(id:2,重试时间:10分钟后)
  • C(id:3,重试时间:现在)

我需要立即处理记录A和C,而记录B需要十分钟后处理。在 Apache Flink 中这是否可行?

到目前为止,无论我研究过什么,“触发器”似乎可能有助于在 Flink 中实现它,但尚未能够正确实现它。

我也查看了 Kafka 文档,但看起来不太可行。

apache-kafka apache-flink flink-streaming flink-cep flink-sql

3
推荐指数
1
解决办法
1957
查看次数

Flink:pods 被禁止:用户“system:serviceaccount:default:default”无法观看命名空间“default”中 API 组“”中的资源“pods”

我正在按照Flink 官方教程在本机 Kubernetes 中启动会话。

首先,我创建了一个干净的新集群。

然而,运行后

./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
Run Code Online (Sandbox Code Playgroud)

my-first-flink-cluster-xxx我在刚刚创建的pod 日志中收到错误:

2021-08-14 18:33:02,519 WARN  io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Exec Failure: HTTP 403, Status: 403 - pods is forbidden: User "system:serviceaccount:default:default" cannot watch resource "pods" in API group "" in the namespace "default"
java.net.ProtocolException: Expected HTTP 101 response but was '403 Forbidden'
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.checkResponse(RealWebSocket.java:229) [flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:196) [flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.12-1.13.1.jar:1.13.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_302]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_302]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
2021-08-14 18:33:02,585 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesPodsWatcher [] - The …
Run Code Online (Sandbox Code Playgroud)

kubernetes apache-flink

3
推荐指数
1
解决办法
3060
查看次数