Jim*_*m C 0 java spring multithreading apache-kafka spring-boot
个人知识:我从javacodegeeks中读到:“... SimpleAsyncTaskExecutor 对于玩具项目来说是可以的,但是对于任何大于它的项目\xe2\x80\x99s 来说有点风险,因为它不限制并发线程并且不重用线程。所以安全,我们还将添加一个任务执行器 bean...”,baeldung提供了一个非常简单的示例,说明如何添加我们自己的任务执行器。但我可以找到任何指导来解释其后果以及一些值得应用的案例。
\n\n个人愿望:我正在努力为我们的微服务日志提供一个企业架构,以便在 Kafka 主题上发布。对于我基于日志的情况来说,“由于不限制并发线程并且不重用它而导致的风险”这一说法似乎是合理的。
\n\n我在本地桌面上成功运行了以下代码,但我想知道我是否正确提供了自定义任务执行器。
\n\n我的问题:考虑到我已经在使用 kafkatempla (即默认情况下同步、单例和线程安全,至少在生成/发送消息方面至少是理解的),下面的配置是否真的朝着正确的方向重用线程并避免意外传播使用 SimpleAsyncTaskExecutor 时创建线程?
\n\n生产者配置
\n\n@EnableAsync\n@Configuration\npublic class KafkaProducerConfig {\n\n private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);\n\n @Value("${kafka.brokers}")\n private String servers;\n\n @Bean\n public Executor taskExecutor() {\n ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();\n executor.setCorePoolSize(2);\n executor.setMaxPoolSize(2);\n executor.setQueueCapacity(500);\n executor.setThreadNamePrefix("KafkaMsgExecutor-");\n executor.initialize();\n return executor;\n }\n\n @Bean\n public Map<String, Object> producerConfigs() {\n Map<String, Object> props = new HashMap<>();\n props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);\n props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);\n props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);\n return props;\n }\n\n}\nRun Code Online (Sandbox Code Playgroud)\n\n制片人
\n\n@Service\npublic class Producer {\n\n private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);\n\n @Autowired\n private KafkaTemplate<String, String> kafkaTemplate;\n\n @Async\n public void send(String topic, String message) {\n ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);\n future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {\n\n @Override\n public void onSuccess(final SendResult<String, String> message) {\n LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());\n }\n\n @Override\n public void onFailure(final Throwable throwable) {\n LOGGER.error("unable to send message= " + message, throwable);\n }\n });\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n\n出于演示目的:
\n\n@SpringBootApplication\npublic class KafkaDemoApplication implements CommandLineRunner {\n\n public static void main(String[] args) {\n SpringApplication.run(KafkaDemoApplication.class, args);\n\n }\n\n @Autowired\n private Producer p;\n\n @Override\n public void run(String... strings) throws Exception {\n p.send("test", " qualquer messagem demonstrativa");\n }\n\n}\nRun Code Online (Sandbox Code Playgroud)\n
这是默认实现SimpleAsyncTaskExecutor
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
Run Code Online (Sandbox Code Playgroud)
每个任务都会创建新的线程,Java 中的线程创建并不便宜:(参考)
线程对象使用大量内存,在大型应用程序中,分配和释放许多线程对象会产生大量内存管理开销。
=> 使用此任务执行器重复执行任务会对应用程序性能产生负面影响(而且,此执行器默认不限制并发任务数)
这就是为什么建议您使用线程池实现,线程创建开销仍然存在,但由于线程被重用而不是创建-触发-忘记而显着减少。
配置时ThreadPoolTaskExecutor,应根据您的应用程序负载正确定义两个值得注意的参数:
private int maxPoolSize = Integer.MAX_VALUE;
这是池中线程的最大数量。
private int queueCapacity = Integer.MAX_VALUE;
这是排队任务的最大数量。当队列已满时,默认值可能会导致 OutOfMemory 异常。
使用默认值 ( Integer.MAX_VALUE) 可能会导致服务器资源不足/崩溃。
您可以通过增加最大池大小的数量来提高吞吐量setMaxPoolSize(),为了减少负载增加时的预热,请将核心池大小设置为更高的值(当负载增加时,将初始化setCorePoolSize()任何数量不同的线程)maxPoolSize - corePoolSize
| 归档时间: |
|
| 查看次数: |
3985 次 |
| 最近记录: |