标签: apache-flink

Flink Kafka Producer 中的 Exactly-once 语义

我正在尝试使用 Kafka Source 和 Sink 测试 Flink 的一次性语义:

  1. 运行 flink 应用程序,只需将消息从一个主题传输到另一个主题,并行度=1,检查点间隔 20 秒
  2. 使用 Python 脚本每 2 秒生成递增整数的消息。
  3. 使用 read_commissed 隔离级别的控制台使用者读取输出主题。
  4. 手动杀死任务管理器

我希望在输出主题中看到单调递增的整数,无论​​ TaskManager 终止和恢复。

但实际上在控制台消费者输出中看到了一些意想不到的东西:

32
33
34
35
36
37
38
39
40
-- TaskManagerKilled
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45
Run Code Online (Sandbox Code Playgroud)

看起来检查点之间的所有消息都在输出主题中重播。这应该是正确的行为还是我做错了什么?

恢复了一张快照: Flink UI

我的弗林克代码:

32
33
34
35
36
37
38
39
40
-- TaskManagerKilled
32
34
35
36
40
41
46
31
33
37
38 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-flink

2
推荐指数
1
解决办法
2989
查看次数

Flink - RocksDB 中的 localdir 配置是什么?

我是 flink 的新手,对状态后端配置有一些困惑。

据我所知,RocksDB 将应用程序的所有状态保存在文件系统上。我使用 s3 来存储状态,因此我将state.checkpoints.dirstate.savepoints.dir都配置为指向我的 s3 存储桶。现在我看到还有另一个与 RocksDB 存储相关的选项,名为state.backend.rocksdb.localdir。这是什么目的?(我看到我不能使用 s3)另外,如果 RocksDB 使用本地机器存储来做某事,当我使用 Kubernetes 并且我的 pod 突然失败时会怎么样?我应该使用持久存储吗?

另一件事,我不确定我是否正确理解了所有国家的事情。检查点是否保存了我的所有状态?例如,当我使用 AggregationFunction 并且应用程序失败时,当应用程序恢复时,每个键的聚合值是否会恢复?

stream-processing rocksdb apache-flink

2
推荐指数
1
解决办法
1468
查看次数

获取服务:AmazonKinesis;状态代码:502(使用 apache-flink 和 localstack Kinesis)

我的本地设置包括local apache-flink(通过brew安装)并localstack运行Kinesis服务。

我的 docker-compose 有

  localstack:
    image: localstack/localstack:0.10.7
    environment:
      - SERVICES=kinesis
    ports:
      - "4568:4568"
Run Code Online (Sandbox Code Playgroud)

和我的 Kinesis 消费者:

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "123");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "123");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ENDPOINT, "http://localhost:4568");
Run Code Online (Sandbox Code Playgroud)

但是当我运行 Flink 程序时,出现以下错误:

引起原因:org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:null(服务:AmazonKinesis;状态代码:502;错误代码:null;请求 ID:null)

仅在使用时才会发生localstack。如果我连接到我的 AWS 账户上的 Kinesis 流,它会完美运行。

error-code apache-flink localstack

2
推荐指数
1
解决办法
1846
查看次数

MapFunction 的实现不可序列化 Flink

我正在尝试实现一个类,该类使用户能够操作 N 个输入流,而不受输入流类型的限制。

首先,我想将所有输入数据流转换为 keyedStreams。因此,我将输入数据流映射到元组中,然后应用 KeyBy 将其转换为密钥流。

我总是遇到序列化问题,我尝试遵循本指南https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html但它不起作用。

我想知道的是:

  1. Java 中的序列化/反序列化是什么?以及用途。
  2. 在 Flink 中通过序列化可以解决哪些问题
  3. 我的代码有什么问题(您可以在代码和错误消息下面找到)

非常感谢。

主要类别:

public class CEP {

private  Integer streamsIdComp = 0;
final  private Map<Integer, DataStream<?> > dataStreams = new HashMap<>();
final  private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();

public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){

    Preconditions.checkNotNull(inputStream, "dataStream");
    TypeInformation<T> streamType = inputStream.getType();

    KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
            map(new MapFunction<T, Tuple2<Integer,T>>() {
                @Override
                public Tuple2<Integer, T> map(T value) throws Exception {
                    return Tuple2.of(streamsIdComp, value);
                }
            }). …
Run Code Online (Sandbox Code Playgroud)

java serialization apache-flink flink-streaming

2
推荐指数
1
解决办法
6077
查看次数

Flink 任务管理器中的槽是什么?

在 Apache Flink 系统架构中,我们有 Client 进程、master 进程(JobManager)、worker 进程(TaskManager)的概念。

上面的每个进程基本上都是一个JVM进程。TaskManager 执行单独的任务,每个任务都在一个线程中执行。因此,管理器到进程或任务到线程的映射是清晰的。

任务管理器中的插槽怎么样?插槽映射到什么?

apache-flink

2
推荐指数
1
解决办法
1849
查看次数

在无密钥的 Flink 流中实施良好平衡的并行性

根据我对Flink的理解,它引入了基于键(键组)的并行性。然而,假设一个人有大量未加密的流并且希望并行完成工作,那么实现这一目标的最佳方法是什么?

如果流有一些字段,人们可能会考虑任意地使用其中一个字段进行键控,但这并不能保证工作负载能够正确平衡。例如,因为该字段中的一个值可能出现在 90% 的消息中。因此我的问题是:

如何在不事先了解流中内容的情况下在 Flink 中实施良好平衡的并行性


我能想到的一个可能的解决方案是为每条消息分配一个随机数(如果您希望并行度为 3,则为 1-3;如果您希望并行度更灵活,则为 1-1000)。然而,我想知道这是否是推荐的方法,因为它感觉不太优雅。

parallel-processing apache-flink flink-streaming

2
推荐指数
1
解决办法
922
查看次数

如何从 Kubernetes 内的 flink docker 镜像启动我的 jar 应用程序?

我正在尝试将此处felipeogutierrez/explore-flink:1.11.1-scala_2.12提供的图像使用到 kubernetes 集群配置中,就像此处所说的那样。我使用 maven 编译我的项目https://github.com/felipegutierrez/explore-flink并使用以下命令扩展默认的 flink 映像:flink:1.11.1-scala_2.12Dockerfile

FROM maven:3.6-jdk-8-slim AS builder
# get explore-flink job and compile it
COPY ./java/explore-flink /opt/explore-flink
WORKDIR /opt/explore-flink
RUN mvn clean install

FROM flink:1.11.1-scala_2.12
WORKDIR /opt/flink/usrlib
COPY --from=builder /opt/explore-flink/target/explore-flink.jar /opt/flink/usrlib/explore-flink.jar
ADD /opt/flink/usrlib/explore-flink.jar /opt/flink/usrlib/explore-flink.jar
#USER flink
Run Code Online (Sandbox Code Playgroud)

然后教程2说创建公共集群组件:

kubectl create -f k8s/flink-configuration-configmap.yaml
kubectl create -f k8s/jobmanager-service.yaml
kubectl proxy
kubectl create -f k8s/jobmanager-rest-service.yaml
kubectl get svc flink-jobmanager-rest
Run Code Online (Sandbox Code Playgroud)

然后创建jobmanager-job.yaml

kubectl create -f k8s/jobmanager-job.yaml
Run Code Online (Sandbox Code Playgroud)

我在 podCrashLoopBackOff …

docker kubernetes apache-flink

2
推荐指数
1
解决办法
3146
查看次数

Flink 检查点不断失败(等待 InitProducerId 时超时)

在深入研究了许多 SO 帖子甚至 JIRA 问题后,我不知道该去哪里了。Flink 中的每个检查点都会因超时而失败,在作业的异常部分中,它显示以下错误,但作业本身不会失败:

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 600000milliseconds while awaiting InitProducerId
Run Code Online (Sandbox Code Playgroud)

禁用检查点时,有关 Kafka 的所有内容都会按预期工作,因此我的假设是它可能与等待 Kafka 提交以便被确认的检查点有关(Semantic设置为EXACTLY_ONCE)。我记得读过有关超时不匹配导致问题的文章,因此我将TRANSACTION_TIMEOUT_CONFIGFlinkKafkaProducer 中的 调整为900000毫秒。

我还按照本期中的建议调整了 TransactionTimeout 和 MaxBlockMS,该期目前有很多关于这个完全相同的错误的讨论,但显然没有解决方案。

Flink 书《Stream Handling with Apache Flink》建议仔细修改 Kafka 配置,例如ackslog.flush.interval.messageslog.flush.interval.mslog.flush.*。我们已经在 Flink 1.9.1 下使用了这个功能,但自从我们升级到 1.11.1 后,它就不再工作了。我不知道是否有人同时接触过 Kafka 设置,但据我所知,除了log.flush.interval=10000. 我们像以前一样使用 Confluence 5.3.3,这意味着 Kafka 2.3.1

此外,Flink 作业部署在单节点环境中,因此它应该能够访问文件系统,整个目录由运行 Flink 服务的用户拥有(这是另一个 SO 线程中建议的解决方案)。

有人知道是什么原因导致这些检查点失败吗?

apache-kafka apache-flink

2
推荐指数
1
解决办法
4897
查看次数

PyFlink - Kafka - 缺少模块

我尝试从 PyFlink 和 Kafka 开始,但出现以下错误。

感谢您的支持 !

安装

python -m pip install apache-flink
pip install pyFlink 
Run Code Online (Sandbox Code Playgroud)

代码

from pyFlink.datastream import StreamExecutionEnvironment
Run Code Online (Sandbox Code Playgroud)

错误

ModuleNotFoundError: No module named 'pyFlink'
Run Code Online (Sandbox Code Playgroud)

apache-flink pyflink

2
推荐指数
1
解决办法
2813
查看次数

Flink作业失败,原因为:java.io.IOException:rpc调用大小超出最大akka帧大小

Flink作业失败,错误信息如下

2020-12-02 09:37:27
java.util.concurrent.CompletionException: java.lang.reflect.UndeclaredThrowableException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.UndeclaredThrowableException
    at com.sun.proxy.$Proxy41.submitTask(Unknown Source)
    at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:77)
    at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$9(Execution.java:735)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    ... 7 more
Caused by: java.io.IOException: The rpc invocation size exceeds the maximum akka framesize.
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:270)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
    ... 11 more

Run Code Online (Sandbox Code Playgroud)

这个作业的逻辑很简单,Kafka的消费数据保存到Clickhouse中。

启动命令

2020-12-02 09:37:27
java.util.concurrent.CompletionException: java.lang.reflect.UndeclaredThrowableException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at …
Run Code Online (Sandbox Code Playgroud)

apache-flink

2
推荐指数
1
解决办法
2655
查看次数