群集中的Apache Flink流不会与工作人员分离作业

Sud*_*kar 7 streaming cluster-computing apache-kafka apache-flink

我的目标是使用Kafka作为源和Flink作为流处理引擎来设置高吞吐量集群.这就是我所做的.

我在主服务器和从服务器上设置了以下配置的双节点集群.

主flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 256

taskmanager.heap.mb: 512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100
Run Code Online (Sandbox Code Playgroud)

奴隶flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 512 #256

taskmanager.heap.mb: 1024 #512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100
Run Code Online (Sandbox Code Playgroud)

主节点上的从属文件如下所示:

<SLAVE_IP_ADDR>
localhost
Run Code Online (Sandbox Code Playgroud)

两个节点上的flink设置位于具有相同名称的文件夹中.我通过运行在master上启动集群

bin/start-cluster-streaming.sh
Run Code Online (Sandbox Code Playgroud)

这将启动从属节点上的任务管理器.

我的输入源是Kafka.这是片段.

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> stream = 
    env.addSource(
    new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);

env.execute("Kafka stream");
Run Code Online (Sandbox Code Playgroud)

这是我的Sink功能

public class MySink implements SinkFunction<String> {

    private static final long serialVersionUID = 1L;

    public void invoke(String arg0) throws Exception {
        processMessage(arg0);
        System.out.println("Processed Message");
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我的pom.xml中的Flink依赖项.

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-core</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>0.9.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

然后我在master上运行带有此命令的打包jar

bin/flink run flink-test-jar-with-dependencies.jar
Run Code Online (Sandbox Code Playgroud)

但是,当我将消息插入Kafka主题时,我能够SinkFunction仅在主节点上考虑来自我的Kafka主题的所有消息(通过我的实现的调用方法中的调试消息).

在作业管理器UI中,我可以看到2个任务管理器,如下所示: 作业管理器仪表板 - 任务管理器

仪表板也是如此: 在此输入图像描述 问题:

  1. 为什么从属节点没有完成任务?
  2. 我错过了一些配置吗?

Til*_*ann 12

从Flink中的Kafka源读取时,源任务的最大并行度受给定Kafka主题的分区数限制.Kafka分区是Flink中源任务可以使用的最小单元.如果分区多于源任务,则某些任务将使用多个分区.

因此,为了向所有100个任务提供输入,您应该确保您的Kafka主题至少有100个分区.

如果您无法更改主题的分区数,则最初也可以使用该setParallelism方法使用较低程度的并行度从Kafka读取.或者,您可以使用rebalance将在前面操作的所有可用任务中混洗数据的方法.