我正在启动一个使用 kafka 消息的应用程序。
我遵循Spring-docs关于反序列化错误处理以捕获反序列化异常。我试过 failedDeserializationFunction 方法。
这是我的消费者配置类
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
/* Error Handling */
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);
return consumerProps;
}
@Bean
public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(NTCMessageBody.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
这是 BiFunction 提供者
public class FailedNTCMessageBodyProvider implements BiFunction<byte[], …Run Code Online (Sandbox Code Playgroud) 我正在使用Hortnworks(HDP 2.4)建立一个集群.我有一个4节点集群,每个集群都有(16Gb-RAM,8-CPU).我也使用Zeppelin Notebook安装了Spark,以便使用python(pyspark).
我的问题是:我开始使用3个节点的配置,之后我添加了另一个新节点(如前所述总共为4个),无论如何,Spark上的执行器数量仍为"3".
我在网上看到可以设置执行程序的数量SPARK_EXECUTOR_INSTANCES,但是这个参数仅存spark-env template在于Ambari UI中Spark的配置页面中.似乎它要求YARN做出关于遗嘱执行人的决定,但在YARN中,我还没有发现任何关于此的事情.
最后,如何使用Ambari增加Hortonworks Hadoop集群中执行程序的数量?