我的目标是使用Kafka作为源和Flink作为流处理引擎来设置高吞吐量集群.这就是我所做的.
我在主服务器和从服务器上设置了以下配置的双节点集群.
主flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Run Code Online (Sandbox Code Playgroud)
奴隶flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512 #256
taskmanager.heap.mb: 1024 #512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Run Code Online (Sandbox Code Playgroud)
主节点上的从属文件如下所示:
<SLAVE_IP_ADDR>
localhost
Run Code Online (Sandbox Code Playgroud)
两个节点上的flink设置位于具有相同名称的文件夹中.我通过运行在master上启动集群
bin/start-cluster-streaming.sh
Run Code Online (Sandbox Code Playgroud)
这将启动从属节点上的任务管理器.
我的输入源是Kafka.这是片段.
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.addSource(
new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);
env.execute("Kafka stream");
Run Code Online (Sandbox Code Playgroud)
这是我的Sink功能
public class MySink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public void invoke(String arg0) throws Exception {
processMessage(arg0);
System.out.println("Processed Message");
} …Run Code Online (Sandbox Code Playgroud) 是否可以使用 Helm 来部署 Argo 工作流程?当我执行以下操作时,我收到如下错误helm install
错误:升级失败:(workflows/templates/my_dag.yaml:47)处解析错误:函数“workflow”未定义
yaml 本身具有 Argo 和 Helm 插值{{..}}。我明白为什么会失败。有没有解决的办法?
我已经看过这个,但它看起来不像我想做的事情,因为它改变了语法。