用自己的Executor替换默认的SimpleAsyncTaskExecutor有什么缺点和风险

Jim*_*m C 0 java spring multithreading apache-kafka spring-boot

个人知识:我从javacodegeeks中读到:“... SimpleAsyncTaskExecutor 对于玩具项目来说是可以的,但是对于任何大于它的项目\xe2\x80\x99s 来说有点风险,因为它不限制并发线程并且不重用线程。所以安全,我们还将添加一个任务执行器 bean...”,baeldung提供了一个非常简单的示例,说明如何添加我们自己的任务执行器。但我可以找到任何指导来解释其后果以及一些值得应用的案例。

\n\n

个人愿望:我正在努力为我们的微服务日志提供一个企业架构,以便在 Kafka 主题上发布。对于我基于日志的情况来说,“由于不限制并发线程并且不重用它而导致的风险”这一说法似乎是合理的。

\n\n

我在本地桌面上成功运行了以下代码,但我想知道我是否正确提供了自定义任务执行器。

\n\n

我的问题:考虑到我已经在使用 kafkatempla (即默认情况下同步、单例和线程安全,至少在生成/发送消息方面至少是理解的),下面的配置是否真的朝着正确的方向重用线程并避免意外传播使用 SimpleAsyncTaskExecutor 时创建线程?

\n\n

生产者配置

\n\n
@EnableAsync\n@Configuration\npublic class KafkaProducerConfig {\n\n    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);\n\n    @Value("${kafka.brokers}")\n    private String servers;\n\n    @Bean\n    public Executor taskExecutor() {\n        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();\n        executor.setCorePoolSize(2);\n        executor.setMaxPoolSize(2);\n        executor.setQueueCapacity(500);\n        executor.setThreadNamePrefix("KafkaMsgExecutor-");\n        executor.initialize();\n        return executor;\n    }\n\n    @Bean\n    public Map<String, Object> producerConfigs() {\n        Map<String, Object> props = new HashMap<>();\n        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);\n        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);\n        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);\n        return props;\n    }\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

制片人

\n\n
@Service\npublic class Producer {\n\n    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);\n\n    @Autowired\n    private KafkaTemplate<String, String> kafkaTemplate;\n\n    @Async\n    public void send(String topic, String message) {\n        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);\n        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {\n\n            @Override\n            public void onSuccess(final SendResult<String, String> message) {\n                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());\n            }\n\n            @Override\n            public void onFailure(final Throwable throwable) {\n                LOGGER.error("unable to send message= " + message, throwable);\n            }\n        });\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

出于演示目的:

\n\n
@SpringBootApplication\npublic class KafkaDemoApplication  implements CommandLineRunner {\n\n    public static void main(String[] args) {\n        SpringApplication.run(KafkaDemoApplication.class, args);\n\n    }\n\n    @Autowired\n    private Producer p;\n\n    @Override\n    public void run(String... strings) throws Exception {\n        p.send("test", " qualquer messagem demonstrativa");\n    }\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n

Mạn*_*yễn 9

这是默认实现SimpleAsyncTaskExecutor

protected void doExecute(Runnable task) {
    Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
    thread.start();
}
Run Code Online (Sandbox Code Playgroud)

每个任务都会创建新的线程,Java 中的线程创建并不便宜:(参考

线程对象使用大量内存,在大型应用程序中,分配和释放许多线程对象会产生大量内存管理开销。

=> 使用此任务执行器重复执行任务会对应用程序性能产生负面影响(而且,此执行器默认不限制并发任务数)

这就是为什么建议您使用线程池实现,线程创建开销仍然存在,但由于线程被重用而不是创建-触发-忘记而显着减少。

配置时ThreadPoolTaskExecutor,应根据您的应用程序负载正确定义两个值得注意的参数:

  1. private int maxPoolSize = Integer.MAX_VALUE;

    这是池中线程的最大数量。

  2. private int queueCapacity = Integer.MAX_VALUE;

    这是排队任务的最大数量。当队列已满时,默认值可能会导致 OutOfMemory 异常。

使用默认值 ( Integer.MAX_VALUE) 可能会导致服务器资源不足/崩溃。

您可以通过增加最大池大小的数量来提高吞吐量setMaxPoolSize(),为了减少负载增加时的预热,请将核心池大小设置为更高的值(当负载增加时,将初始化setCorePoolSize()任何数量不同的线程)maxPoolSize - corePoolSize