自 Spring Boot 2.6.X 以来,EmbeddedKafka 失败:AccessDeniedException: ..\AppData\Local\Temp\spring.kafka*

DRo*_*elt 13 spring-kafka spring-kafka-test

e:这已通过 Spring Boot 2.6.5 修复(请参阅https://github.com/spring-projects/spring-boot/issues/30243

自从升级到 Spring Boot 2.6.X(在我的例子中:2.6.1)以来,我有多个项目现在在 Windows 上的单元测试失败,无法启动EmbeddedKafka,但在 Linux 上运行

有多个错误,但这是抛出的第一个错误

...
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.1)

2021-12-09 16:15:00.300  INFO 13864 --- [           main] k.utils.Log4jControllerRegistration$     : Registered kafka:type=kafka.Log4jController MBean
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : 
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :   ______                  _                                          
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :  |___  /                 | |                                         
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :     / /    ___     ___   | | __   ___    ___   _ __     ___   _ __   
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :    / /    / _ \   / _ \  | |/ /  / _ \  / _ \ | '_ \   / _ \ | '__|
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :   / /__  | (_) | | (_) | |   <  |  __/ |  __/ | |_) | |  __/ | |    
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :  /_____|  \___/   \___/  |_|\_\  \___|  \___| | .__/   \___| |_|
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :                                               | |                     
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :                                               |_|                     
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : 
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:host.name=host.docker.internal
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:java.version=11.0.11
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:java.vendor=AdoptOpenJDK
...
2021-12-09 16:15:01.015  INFO 13864 --- [nelReaper-Fetch] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Fetch]: Starting
2021-12-09 16:15:01.015  INFO 13864 --- [lReaper-Produce] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Produce]: Starting
2021-12-09 16:15:01.016  INFO 13864 --- [lReaper-Request] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Request]: Starting
2021-12-09 16:15:01.017  INFO 13864 --- [trollerMutation] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-ControllerMutation]: Starting
2021-12-09 16:15:01.037  INFO 13864 --- [           main] kafka.log.LogManager                     : Loading logs from log dirs ArraySeq(C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446)
2021-12-09 16:15:01.040  INFO 13864 --- [           main] kafka.log.LogManager                     : Attempting recovery for all logs in C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446 since no clean shutdown file was found
2021-12-09 16:15:01.043  INFO 13864 --- [           main] kafka.log.LogManager                     : Loaded 0 logs in 6ms.
2021-12-09 16:15:01.043  INFO 13864 --- [           main] kafka.log.LogManager                     : Starting log cleanup with a period of 300000 ms.
2021-12-09 16:15:01.045  INFO 13864 --- [           main] kafka.log.LogManager                     : Starting log flusher with a default period of 9223372036854775807 ms.
2021-12-09 16:15:01.052  INFO 13864 --- [           main] kafka.log.LogCleaner                     : Starting the log cleaner
2021-12-09 16:15:01.059  INFO 13864 --- [leaner-thread-0] kafka.log.LogCleaner                     : [kafka-log-cleaner-thread-0]: Starting
2021-12-09 16:15:01.224  INFO 13864 --- [name=forwarding] k.s.BrokerToControllerRequestThread      : [BrokerToControllerChannelManager broker=0 name=forwarding]: Starting
2021-12-09 16:15:01.325  INFO 13864 --- [           main] kafka.network.ConnectionQuotas           : Updated connection-accept-rate max connection creation rate to 2147483647
2021-12-09 16:15:01.327  INFO 13864 --- [           main] kafka.network.Acceptor                   : Awaiting socket connections on localhost:63919.
2021-12-09 16:15:01.345  INFO 13864 --- [           main] kafka.network.SocketServer               : [SocketServer listenerType=ZK_BROKER, nodeId=0] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT)
2021-12-09 16:15:01.350  INFO 13864 --- [0 name=alterIsr] k.s.BrokerToControllerRequestThread      : [BrokerToControllerChannelManager broker=0 name=alterIsr]: Starting
2021-12-09 16:15:01.364  INFO 13864 --- [eaper-0-Produce] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-Produce]: Starting
2021-12-09 16:15:01.364  INFO 13864 --- [nReaper-0-Fetch] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-Fetch]: Starting
2021-12-09 16:15:01.365  INFO 13864 --- [0-DeleteRecords] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-DeleteRecords]: Starting
2021-12-09 16:15:01.365  INFO 13864 --- [r-0-ElectLeader] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-ElectLeader]: Starting
2021-12-09 16:15:01.374  INFO 13864 --- [rFailureHandler] k.s.ReplicaManager$LogDirFailureHandler  : [LogDirFailureHandler]: Starting
2021-12-09 16:15:01.390  INFO 13864 --- [           main] kafka.zk.KafkaZkClient                   : Creating /brokers/ids/0 (is it secure? false)
2021-12-09 16:15:01.400  INFO 13864 --- [           main] kafka.zk.KafkaZkClient                   : Stat of the created znode at /brokers/ids/0 is: 25,25,1639062901396,1639062901396,1,0,0,72059919267528704,204,0,25

2021-12-09 16:15:01.400  INFO 13864 --- [           main] kafka.zk.KafkaZkClient                   : Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT://localhost:63919, czxid (broker epoch): 25
2021-12-09 16:15:01.410 ERROR 13864 --- [           main] kafka.server.BrokerMetadataCheckpoint    : Failed to write meta.properties due to

java.nio.file.AccessDeniedException: C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446
    at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89) ~[na:na]
    at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) ~[na:na]
    at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108) ~[na:na]
Run Code Online (Sandbox Code Playgroud)

可通过 spring Initializr + 添加“Spring Kafka”重现:https://start.spring.io/#!type =maven-project&language=java&platformVersion=2.6.1&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project %20for%20Spring%20Boot&packageName=com.example.demo&dependencies=kafka

然后执行以下测试类:

package com.example.demo;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka
class ApplicationTest {

    @Test
    void run() {
        int i = 1 + 1; // just a line of code to set a debug-point
    }
}
Run Code Online (Sandbox Code Playgroud)

kafka.version在 pom.xml 的属性中固定到 2.8.1 时,我没有出现此错误。

似乎原因在于 Kafka 本身,但我很难弄清楚是否是 spring-kafka 通过 EmbeddedKafka 错误地初始化了 Kafka,或者 Kafka 本身是否是这里的罪魁祸首。

有人有主意吗?我是否缺少要设置的测试参数?

And*_*Gee 12

作为解决方法,请将修补后的https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/Utils.java添加到项目测试源中(在同一个包下)直到 Kafka 3.0.1 随 Spring Boot 一起发布。- 当然,发生这种情况时删除这个临时类。


Art*_*lan 6

Apache Kafka 端的已知错误。从 Spring 的角度来看,没有什么可做的。请在此处查看更多信息:https ://github.com/spring-projects/spring-kafka/discussions/2027 。这里: https: //issues.apache.org/jira/browse/KAFKA-13391

您需要等到 Apache Kafka 出现3.0.1,或者不使用嵌入式 Kafka,而仅依赖于 Testcontainers,或者完全外部的 Apache Kafka 代理。


DRo*_*elt 5

另一种方法是仅针对 Windows 环境确定 kafka 2.8.1。

这假设您生成用于生产用途的 jar 的构建环境不是 Windows 盒子

在pom.xml中添加

  <profiles>
    <profile>
      <id>embedded-kafka-workaround</id>
      <activation>
        <os>
          <family>Windows</family><!-- super hacky workaround for /sf/answers/4920483781/ . "if os = windows" condition until kafka 3.0.1 or 3.1.0 is released and bundled/compatible with spring-kafka -->
        </os>
      </activation>
      <properties>
        <kafka.version>2.8.1</kafka.version><!-- only locally and when in windows, kafka 3.0.0 fails to start embedded kafka -->
      </properties>
    </profile>
  </profiles>

Run Code Online (Sandbox Code Playgroud)


DRo*_*elt 4

虽然我会等到 kafka3.0.1发布,但对于那些只想切换到 Testcontainers 但不熟悉如何设置它们的人来说:

基于此initlzr的示例:https://start.spring.io/# !type=maven-project&language=java&platformVersion=2.6.1&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring% 20Boot&packageName=com.example.demo&dependencies=kafka,testcontainers

可运行的应用程序

package com.example.demo;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

import java.time.LocalDateTime;
import java.util.stream.IntStream;

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @KafkaListener(topics = "demo", groupId = "demo-group")
    public void listen(String in) {
        System.out.println("Processing: " + in);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("demo", 5, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> {
                        String event = "foo" + i;
                        System.out.println("Sending " + event);
                        template.send("demo", i + "", event);

                    }
            );

        };
    }
}
Run Code Online (Sandbox Code Playgroud)

带有测试容器的测试代码,其中 Kafka 将在 Docker 中启动

package com.example.demo;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
@SpringBootTest
class DemoApplicationTest {

    @Autowired
    ApplicationRunner applicationRunner;

    @Container
    public static KafkaContainer kafkaContainer =
            new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));

    @BeforeAll
    static void setUp() {
        kafkaContainer.start();
    }

    @DynamicPropertySource
    static void addDynamicProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
    }


    @Test
    void run() throws Exception {
        applicationRunner.run(null);
    }
}
Run Code Online (Sandbox Code Playgroud)

对 pom.xml 进行必要的添加

    <dependencies>
...
        </dependency>
        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>junit-jupiter</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>kafka</artifactId>
            <scope>test</scope>
        </dependency>
...
    </dependencies>


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.testcontainers</groupId>
                <artifactId>testcontainers-bom</artifactId>
                <version>1.16.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
Run Code Online (Sandbox Code Playgroud)