小编Pie*_*ito的帖子

如何在 Kafka-Spring 中捕获反序列化错误?

我正在启动一个使用 kafka 消息的应用程序。

我遵循Spring-docs关于反序列化错误处理以捕获反序列化异常。我试过 failedDeserializationFunction 方法。

这是我的消费者配置类

@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);

        /*  Error Handling */
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);

        return consumerProps;
    }

    @Bean
    public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<>(NTCMessageBody.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
Run Code Online (Sandbox Code Playgroud)

这是 BiFunction 提供者

public class FailedNTCMessageBodyProvider implements BiFunction<byte[], …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka

5
推荐指数
2
解决办法
2万
查看次数

在Zeppelin上增加Spark Executors

我正在使用Hortnworks(HDP 2.4)建立一个集群.我有一个4节点集群,每个集群都有(16Gb-RAM,8-CPU).我也使用Zeppelin Notebook安装了Spark,以便使用python(pyspark).

我的问题是:我开始使用3个节点的配置,之后我添加了另一个新节点(如前所述总共为4个),无论如何,Spark上的执行器数量仍为"3".

我在网上看到可以设置执行程序的数量SPARK_EXECUTOR_INSTANCES,但是这个参数仅存spark-env template在于Ambari UI中Spark的配置页面中.似乎它要求YARN做出关于遗嘱执行人的决定,但在YARN中,我还没有发现任何关于此的事情.

在此输入图像描述

最后,如何使用Ambari增加Hortonworks Hadoop集群中执行程序的数量?

python hadoop hadoop-yarn apache-spark apache-zeppelin

1
推荐指数
2
解决办法
6028
查看次数