我正在尝试使用 Kafka Source 和 Sink 测试 Flink 的一次性语义:
我希望在输出主题中看到单调递增的整数,无论 TaskManager 终止和恢复。
但实际上在控制台消费者输出中看到了一些意想不到的东西:
32
33
34
35
36
37
38
39
40
-- TaskManagerKilled
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45
Run Code Online (Sandbox Code Playgroud)
看起来检查点之间的所有消息都在输出主题中重播。这应该是正确的行为还是我做错了什么?
恢复了一张快照: Flink UI
我的弗林克代码:
32
33
34
35
36
37
38
39
40
-- TaskManagerKilled
32
34
35
36
40
41
46
31
33
37
38 …Run Code Online (Sandbox Code Playgroud) 我是 flink 的新手,对状态后端配置有一些困惑。
据我所知,RocksDB 将应用程序的所有状态保存在文件系统上。我使用 s3 来存储状态,因此我将state.checkpoints.dir和state.savepoints.dir都配置为指向我的 s3 存储桶。现在我看到还有另一个与 RocksDB 存储相关的选项,名为state.backend.rocksdb.localdir。这是什么目的?(我看到我不能使用 s3)另外,如果 RocksDB 使用本地机器存储来做某事,当我使用 Kubernetes 并且我的 pod 突然失败时会怎么样?我应该使用持久存储吗?
另一件事,我不确定我是否正确理解了所有国家的事情。检查点是否保存了我的所有状态?例如,当我使用 AggregationFunction 并且应用程序失败时,当应用程序恢复时,每个键的聚合值是否会恢复?
我的本地设置包括local apache-flink(通过brew安装)并localstack运行Kinesis服务。
我的 docker-compose 有
localstack:
image: localstack/localstack:0.10.7
environment:
- SERVICES=kinesis
ports:
- "4568:4568"
Run Code Online (Sandbox Code Playgroud)
和我的 Kinesis 消费者:
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "123");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "123");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ENDPOINT, "http://localhost:4568");
Run Code Online (Sandbox Code Playgroud)
但是当我运行 Flink 程序时,出现以下错误:
引起原因:org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException:null(服务:AmazonKinesis;状态代码:502;错误代码:null;请求 ID:null)
仅在使用时才会发生localstack。如果我连接到我的 AWS 账户上的 Kinesis 流,它会完美运行。
我正在尝试实现一个类,该类使用户能够操作 N 个输入流,而不受输入流类型的限制。
首先,我想将所有输入数据流转换为 keyedStreams。因此,我将输入数据流映射到元组中,然后应用 KeyBy 将其转换为密钥流。
我总是遇到序列化问题,我尝试遵循本指南https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html但它不起作用。
我想知道的是:
非常感谢。
主要类别:
public class CEP {
private Integer streamsIdComp = 0;
final private Map<Integer, DataStream<?> > dataStreams = new HashMap<>();
final private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();
public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){
Preconditions.checkNotNull(inputStream, "dataStream");
TypeInformation<T> streamType = inputStream.getType();
KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
map(new MapFunction<T, Tuple2<Integer,T>>() {
@Override
public Tuple2<Integer, T> map(T value) throws Exception {
return Tuple2.of(streamsIdComp, value);
}
}). …Run Code Online (Sandbox Code Playgroud) 在 Apache Flink 系统架构中,我们有 Client 进程、master 进程(JobManager)、worker 进程(TaskManager)的概念。
上面的每个进程基本上都是一个JVM进程。TaskManager 执行单独的任务,每个任务都在一个线程中执行。因此,管理器到进程或任务到线程的映射是清晰的。
任务管理器中的插槽怎么样?插槽映射到什么?
根据我对Flink的理解,它引入了基于键(键组)的并行性。然而,假设一个人有大量未加密的流并且希望并行完成工作,那么实现这一目标的最佳方法是什么?
如果流有一些字段,人们可能会考虑任意地使用其中一个字段进行键控,但这并不能保证工作负载能够正确平衡。例如,因为该字段中的一个值可能出现在 90% 的消息中。因此我的问题是:
我能想到的一个可能的解决方案是为每条消息分配一个随机数(如果您希望并行度为 3,则为 1-3;如果您希望并行度更灵活,则为 1-1000)。然而,我想知道这是否是推荐的方法,因为它感觉不太优雅。
我正在尝试将此处felipeogutierrez/explore-flink:1.11.1-scala_2.12提供的图像使用到 kubernetes 集群配置中,就像此处所说的那样。我使用 maven 编译我的项目https://github.com/felipegutierrez/explore-flink并使用以下命令扩展默认的 flink 映像:flink:1.11.1-scala_2.12Dockerfile
FROM maven:3.6-jdk-8-slim AS builder
# get explore-flink job and compile it
COPY ./java/explore-flink /opt/explore-flink
WORKDIR /opt/explore-flink
RUN mvn clean install
FROM flink:1.11.1-scala_2.12
WORKDIR /opt/flink/usrlib
COPY --from=builder /opt/explore-flink/target/explore-flink.jar /opt/flink/usrlib/explore-flink.jar
ADD /opt/flink/usrlib/explore-flink.jar /opt/flink/usrlib/explore-flink.jar
#USER flink
Run Code Online (Sandbox Code Playgroud)
然后教程2说创建公共集群组件:
kubectl create -f k8s/flink-configuration-configmap.yaml
kubectl create -f k8s/jobmanager-service.yaml
kubectl proxy
kubectl create -f k8s/jobmanager-rest-service.yaml
kubectl get svc flink-jobmanager-rest
Run Code Online (Sandbox Code Playgroud)
然后创建jobmanager-job.yaml:
kubectl create -f k8s/jobmanager-job.yaml
Run Code Online (Sandbox Code Playgroud)
我在 podCrashLoopBackOff …
在深入研究了许多 SO 帖子甚至 JIRA 问题后,我不知道该去哪里了。Flink 中的每个检查点都会因超时而失败,在作业的异常部分中,它显示以下错误,但作业本身不会失败:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 600000milliseconds while awaiting InitProducerId
Run Code Online (Sandbox Code Playgroud)
禁用检查点时,有关 Kafka 的所有内容都会按预期工作,因此我的假设是它可能与等待 Kafka 提交以便被确认的检查点有关(Semantic设置为EXACTLY_ONCE)。我记得读过有关超时不匹配导致问题的文章,因此我将TRANSACTION_TIMEOUT_CONFIGFlinkKafkaProducer 中的 调整为900000毫秒。
我还按照本期中的建议调整了 TransactionTimeout 和 MaxBlockMS,该期目前有很多关于这个完全相同的错误的讨论,但显然没有解决方案。
Flink 书《Stream Handling with Apache Flink》建议仔细修改 Kafka 配置,例如acks、log.flush.interval.messages、log.flush.interval.ms和log.flush.*。我们已经在 Flink 1.9.1 下使用了这个功能,但自从我们升级到 1.11.1 后,它就不再工作了。我不知道是否有人同时接触过 Kafka 设置,但据我所知,除了log.flush.interval=10000. 我们像以前一样使用 Confluence 5.3.3,这意味着 Kafka 2.3.1。
此外,Flink 作业部署在单节点环境中,因此它应该能够访问文件系统,整个目录由运行 Flink 服务的用户拥有(这是另一个 SO 线程中建议的解决方案)。
有人知道是什么原因导致这些检查点失败吗?
我尝试从 PyFlink 和 Kafka 开始,但出现以下错误。
感谢您的支持 !
安装
python -m pip install apache-flink
pip install pyFlink
Run Code Online (Sandbox Code Playgroud)
代码
from pyFlink.datastream import StreamExecutionEnvironment
Run Code Online (Sandbox Code Playgroud)
错误
ModuleNotFoundError: No module named 'pyFlink'
Run Code Online (Sandbox Code Playgroud) Flink作业失败,错误信息如下
2020-12-02 09:37:27
java.util.concurrent.CompletionException: java.lang.reflect.UndeclaredThrowableException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.UndeclaredThrowableException
at com.sun.proxy.$Proxy41.submitTask(Unknown Source)
at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:77)
at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$9(Execution.java:735)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 7 more
Caused by: java.io.IOException: The rpc invocation size exceeds the maximum akka framesize.
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:270)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
... 11 more
Run Code Online (Sandbox Code Playgroud)
这个作业的逻辑很简单,Kafka的消费数据保存到Clickhouse中。
启动命令
2020-12-02 09:37:27
java.util.concurrent.CompletionException: java.lang.reflect.UndeclaredThrowableException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at …Run Code Online (Sandbox Code Playgroud) apache-flink ×10
apache-kafka ×2
docker ×1
error-code ×1
java ×1
kubernetes ×1
localstack ×1
pyflink ×1
rocksdb ×1