标签: spring-kafka-test

Spring Kafka集成测试写入高水印文件时出错

我正在 Spring Boot 应用程序中使用 spring-kaka-2.2.0 编写集成测试,我几乎成功了,我的测试用例仍然返回 true,但在那之后我仍然看到多个错误。

2019-02-21 11:12:35.434 ERROR 5717 --- [       Thread-7] kafka.server.ReplicaManager              : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645

org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)
Run Code Online (Sandbox Code Playgroud)

测试配置

@EnableKafka
@TestConfiguration
public class KafkaProducerConfigTest {

@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
    return new EmbeddedKafkaBroker(1,false,2,"test-events");
}


@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka spring-kafka-test

16
推荐指数
2
解决办法
4408
查看次数

EmbeddedKafka w/ ContainerTestUtils.waitForAssignment 抛出:预期 1 但获得 0 个分区

我们有一个集成测试,我们使用 EmbeddedKafka 并向某个主题生成一条消息,我们的应用程序处理该消息,然后将结果发送到我们使用并断言输出的第二个主题。在 CI 中,这可能在 2/3 的时间内有效,但我们会遇到KafkaTestUtils.getSingleRecord抛出异常的情况java.lang.IllegalStateException: No records found for topic(参见下面的 [1])。

为了尝试解决此问题,我ContainerTestUtils.waitForAssignment为注册表中的每个侦听器容器添加了(请参阅下面的 [2])。在 CI 中成功运行几次后,我看到了一个新的异常:java.lang.IllegalStateException: Expected 1 but got 0 partitions。现在我想知道这是否真的是原始异常“未找到记录”的根本原因。

有什么想法可以帮助解决这里的随机故障吗?我将不胜感激任何有关如何排除故障的建议。

spring-kafka 和 spring-kafka-test v2.6.4。

编辑:添加newConsumer以供参考。

我们的设置示例:

@SpringBootTest
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(
    topics = { "topic1","topic2" },
    partitions = 1,
    brokerProperties = {"listeners=PLAINTEXT://localhost:9099", "port=9099"})
public class IntegrationTest {

  @Autowired
  private EmbeddedKafkaBroker embeddedKafkaBroker;

  @Autowired
  private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

  @Test
  public void testExample() {
    try (Consumer<String, String> consumer = newConsumer()) { …
Run Code Online (Sandbox Code Playgroud)

spring-kafka spring-kafka-test

13
推荐指数
1
解决办法
4435
查看次数

自 Spring Boot 2.6.X 以来,EmbeddedKafka 失败:AccessDeniedException: ..\AppData\Local\Temp\spring.kafka*

e:这已通过 Spring Boot 2.6.5 修复(请参阅https://github.com/spring-projects/spring-boot/issues/30243

自从升级到 Spring Boot 2.6.X(在我的例子中:2.6.1)以来,我有多个项目现在在 Windows 上的单元测试失败,无法启动EmbeddedKafka,但在 Linux 上运行

有多个错误,但这是抛出的第一个错误

...
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: …
Run Code Online (Sandbox Code Playgroud)

spring-kafka spring-kafka-test

13
推荐指数
4
解决办法
1万
查看次数

Spring KafkaListener:如何知道何时准备就绪

我有一个简单的 Spring Boot 应用程序,它从 Kafka 读取并写入 Kafka。我写了一个SpringBootTestusing anEmbeddedKafka来测试所有这些。

主要问题是:有时测试失败是因为测试过早发送Kafka消息。这样,在 Spring 应用程序(或者KafkaListener准确地说)准备就绪之前,消息已经写入 Kafka 。由于侦听器从latest偏移量中读取(我不想为我的测试更改任何配置 - 除了 bootstrap.servers),它不会收到该测试中的所有消息。

有谁知道我如何在测试中知道KafkaListener已准备好接收消息?

我能想到的唯一方法是等到/health可用,但我不知道我是否可以确定这意味着KafkaListener完全准备好了。

任何帮助是极大的赞赏!

此致。

spring apache-kafka spring-boot spring-kafka spring-kafka-test

8
推荐指数
2
解决办法
1451
查看次数

如何修复无法找到meta.properties的嵌入式kafka中的错误

我正在尝试使用kafka,kafka-streams和cassandra对应用进行集成测试。但是,当我尝试设置测试类时,我遇到了2个错误:错误[main] BrokerMetadataCheckpoint:无法读取dir下的meta.properties文件错误[主要] KafkaServer:无法读取日志目录下的meta.properties

我正在使用spring-boot-starter 2.1.2,spring-boot-starter-test 2.1.2,spring-kafka 2.2.0,spring-kafka-test 2.2.0,apache.kafka-streams 2.1.0

尝试更改logs.dir和logs.dirs参数。使用@EnableKafka @EnableKafkaStreams

@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(partitions = 3, controlledShutdown = false, count = 1, topics = {"zc.deviceposition"}, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092", "log.dir=/home/name/logs"})
@EmbeddedCassandra(timeout = 60000)
@CassandraDataSet(value = {"bootstrap_test.cql"}, keyspace = "statistics")
@ActiveProfiles("test")
@DirtiesContext
public class CassandraTripsAggregatorProcessorSupplierIntegrationTest {
  @Test
  public void someTest() {System.out.println("hello world");}
}
Run Code Online (Sandbox Code Playgroud)

我期望使用嵌入式kafka运行上下文,但是现在我收到一个错误,指出meta.properties不存在

spring-boot-test embedded-kafka spring-kafka-test

7
推荐指数
1
解决办法
820
查看次数

Kafka Streams 测试:java.util.NoSuchElementException:未初始化的主题:“output_topic_name”

我已经按照https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为 kafka 流应用程序编写了一个测试类 ,其代码是

import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

public class KafkaStreamsConfigTest {

private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;

private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();

private String key="test";
private Object value = "some value";
private Object expected_value = "real value";

String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";

String applicationId = "my-app";
String …
Run Code Online (Sandbox Code Playgroud)

unit-testing apache-kafka apache-kafka-streams kafka-topic spring-kafka-test

6
推荐指数
1
解决办法
1458
查看次数

Spring Kafka 测试 - 使用 EmbeddedKafka 在 @KafkaListener 中未接收数据

我们正在使用 Cucumber 对外部应用程序进行一些集成测试,我们在测试@KafkaListener. 我们设法使用 EmbeddedKafka 并将数据生成到其中。

但是消费者永远不会收到任何数据,我们也不知道发生了什么。

这是我们的代码:

生产者配置

@Configuration
@Profile("test")
public class KafkaTestProducerConfig {

    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;

    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                embeddedKafka.getBrokersAsString());
        props.put(SCHEMA_REGISTRY_URL, "URL");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericRecord> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, GenericRecord> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
Run Code Online (Sandbox Code Playgroud)

消费者配置

@Configuration
@Profile("test")
@EnableKafka
public class KafkaTestConsumerConfig { …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka spring-kafka-test

6
推荐指数
1
解决办法
1013
查看次数

嵌入式 Kafka 测试随机失败

我使用 EmbededKafka 实现了一系列集成测试,以测试使用 spring-kafka 框架运行的一个 Kafka 流应用程序。

流应用程序正在从 Kafka 主题读取消息,将其存储到内部状态存储中,进行一些转换并将其发送到另一个微服务到请求的主题中。当响应返回到响应主题时,它会从状态存储中检索原始消息,并根据某些业务逻辑将其转发到我们的下游系统之一,每个下游系统都有自己的主题。

集成测试只是测试业务条件的各种排列。

最初,测试被分成不同的班级。运行构建时,一个类中的测试与另一类中的测试发生冲突,并存在一些冲突异常。我没有花太多时间在这上面,只是将所有测试移到同一个类中。这解决了我从 gradle build 或从 intelij EDI 通过的所有测试的问题。

这是测试:

package au.nab.tlm.streams.integration;

import au.nab.tlm.streams.serde.EntitlementsCheckSerDes;
import au.nab.tlm.streams.test.support.MockEntitlementsCheckSerDes;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;

@SpringBootTest
@ContextConfiguration(classes = {MyTopologiesIntegrationTest.TestKafkaConfig.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(
        ports = 9092,
        partitions = 1,
        topics = {
                "topic-1.v1",
                "topic-2.v1",
                "topic-3.v1",
                "topic-4.v1",
                "topic-5.v1",
                "topic-6.v1",
        },
        brokerProperties = {"transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1", "log.dir=/tmp/embedded-kafka"}
)
public class MyTopologiesIntegrationTest {
    @Autowired
    EmbeddedKafkaBroker kafkaBroker; …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-kafka spring-kafka-test

6
推荐指数
1
解决办法
7851
查看次数

SpringBoot 嵌入式 Kafka 使用 Avro Schema 生成事件

我创建了以下测试类来使用 AvroSerializer 生成事件。

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
@TestPropertySource(locations = ("classpath:application-test.properties"))
@ContextConfiguration(classes = { TestAppConfig.class })
@DirtiesContext
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EntitlementEventsConsumerServiceImplTest {


    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Bean
    MockSchemaRegistryClient mockSchemaRegistryClient() {
        return new MockSchemaRegistryClient();
    }

    @Bean
    KafkaAvroSerializer kafkaAvroSerializer() {
        return new KafkaAvroSerializer(mockSchemaRegistryClient());
    }

    @Bean
    public DefaultKafkaProducerFactory producerFactory() {
        Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafkaBroker);
        props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
        return new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer());
    }

    @Bean
    public KafkaTemplate<String, ApplicationEvent> kafkaTemplate() {
        KafkaTemplate<String, ApplicationEvent> kafkaTemplate = new KafkaTemplate(producerFactory());
        return kafkaTemplate;
    }
} …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-kafka confluent-schema-registry spring-kafka-test

6
推荐指数
2
解决办法
4564
查看次数

@EmbeddedKafka 尝试多次注册 AppInfo mbean?

我使用 @EmbeddedKafka 进行了多个集成测试,在迁移到较新的 springboot 版本 2.1.8.RELEASE 后,日志中充满了这些堆栈跟踪。知道是什么原因造成的吗?

javax.management.InstanceAlreadyExistsException: kafka.server:type=app-info,id=0
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:321)
    at kafka.utils.TestUtils$.createServer(TestUtils.scala:132)
    at kafka.utils.TestUtils.createServer(TestUtils.scala)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:223)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1837)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1774)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:405)
Run Code Online (Sandbox Code Playgroud)

spring spring-kafka spring-kafka-test

5
推荐指数
1
解决办法
3098
查看次数