我正在 Spring Boot 应用程序中使用 spring-kaka-2.2.0 编写集成测试,我几乎成功了,我的测试用例仍然返回 true,但在那之后我仍然看到多个错误。
2019-02-21 11:12:35.434 ERROR 5717 --- [ Thread-7] kafka.server.ReplicaManager : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645
org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)
Run Code Online (Sandbox Code Playgroud)
测试配置
@EnableKafka
@TestConfiguration
public class KafkaProducerConfigTest {
@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
return new EmbeddedKafkaBroker(1,false,2,"test-events");
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); …Run Code Online (Sandbox Code Playgroud) java apache-kafka spring-boot spring-kafka spring-kafka-test
我们有一个集成测试,我们使用 EmbeddedKafka 并向某个主题生成一条消息,我们的应用程序处理该消息,然后将结果发送到我们使用并断言输出的第二个主题。在 CI 中,这可能在 2/3 的时间内有效,但我们会遇到KafkaTestUtils.getSingleRecord抛出异常的情况java.lang.IllegalStateException: No records found for topic(参见下面的 [1])。
为了尝试解决此问题,我ContainerTestUtils.waitForAssignment为注册表中的每个侦听器容器添加了(请参阅下面的 [2])。在 CI 中成功运行几次后,我看到了一个新的异常:java.lang.IllegalStateException: Expected 1 but got 0 partitions。现在我想知道这是否真的是原始异常“未找到记录”的根本原因。
有什么想法可以帮助解决这里的随机故障吗?我将不胜感激任何有关如何排除故障的建议。
spring-kafka 和 spring-kafka-test v2.6.4。
编辑:添加newConsumer以供参考。
我们的设置示例:
@SpringBootTest
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(
topics = { "topic1","topic2" },
partitions = 1,
brokerProperties = {"listeners=PLAINTEXT://localhost:9099", "port=9099"})
public class IntegrationTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Test
public void testExample() {
try (Consumer<String, String> consumer = newConsumer()) { …Run Code Online (Sandbox Code Playgroud) e:这已通过 Spring Boot 2.6.5 修复(请参阅https://github.com/spring-projects/spring-boot/issues/30243)
自从升级到 Spring Boot 2.6.X(在我的例子中:2.6.1)以来,我有多个项目现在在 Windows 上的单元测试失败,无法启动EmbeddedKafka,但在 Linux 上运行
有多个错误,但这是抛出的第一个错误
...
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: …Run Code Online (Sandbox Code Playgroud) 我有一个简单的 Spring Boot 应用程序,它从 Kafka 读取并写入 Kafka。我写了一个SpringBootTestusing anEmbeddedKafka来测试所有这些。
主要问题是:有时测试失败是因为测试过早发送Kafka消息。这样,在 Spring 应用程序(或者KafkaListener准确地说)准备就绪之前,消息已经写入 Kafka 。由于侦听器从latest偏移量中读取(我不想为我的测试更改任何配置 - 除了 bootstrap.servers),它不会收到该测试中的所有消息。
有谁知道我如何在测试中知道KafkaListener已准备好接收消息?
我能想到的唯一方法是等到/health可用,但我不知道我是否可以确定这意味着KafkaListener完全准备好了。
任何帮助是极大的赞赏!
此致。
spring apache-kafka spring-boot spring-kafka spring-kafka-test
我正在尝试使用kafka,kafka-streams和cassandra对应用进行集成测试。但是,当我尝试设置测试类时,我遇到了2个错误:错误[main] BrokerMetadataCheckpoint:无法读取dir下的meta.properties文件错误[主要] KafkaServer:无法读取日志目录下的meta.properties
我正在使用spring-boot-starter 2.1.2,spring-boot-starter-test 2.1.2,spring-kafka 2.2.0,spring-kafka-test 2.2.0,apache.kafka-streams 2.1.0
尝试更改logs.dir和logs.dirs参数。使用@EnableKafka @EnableKafkaStreams
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(partitions = 3, controlledShutdown = false, count = 1, topics = {"zc.deviceposition"}, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092", "log.dir=/home/name/logs"})
@EmbeddedCassandra(timeout = 60000)
@CassandraDataSet(value = {"bootstrap_test.cql"}, keyspace = "statistics")
@ActiveProfiles("test")
@DirtiesContext
public class CassandraTripsAggregatorProcessorSupplierIntegrationTest {
@Test
public void someTest() {System.out.println("hello world");}
}
Run Code Online (Sandbox Code Playgroud)
我期望使用嵌入式kafka运行上下文,但是现在我收到一个错误,指出meta.properties不存在
我已经按照https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为 kafka 流应用程序编写了一个测试类 ,其代码是
import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Properties;
public class KafkaStreamsConfigTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;
private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();
private String key="test";
private Object value = "some value";
private Object expected_value = "real value";
String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";
String applicationId = "my-app";
String …Run Code Online (Sandbox Code Playgroud) unit-testing apache-kafka apache-kafka-streams kafka-topic spring-kafka-test
我们正在使用 Cucumber 对外部应用程序进行一些集成测试,我们在测试@KafkaListener. 我们设法使用 EmbeddedKafka 并将数据生成到其中。
但是消费者永远不会收到任何数据,我们也不知道发生了什么。
这是我们的代码:
生产者配置
@Configuration
@Profile("test")
public class KafkaTestProducerConfig {
private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
@Autowired
protected EmbeddedKafkaBroker embeddedKafka;
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
embeddedKafka.getBrokersAsString());
props.put(SCHEMA_REGISTRY_URL, "URL");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, GenericRecord> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, GenericRecord> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Run Code Online (Sandbox Code Playgroud)
消费者配置
@Configuration
@Profile("test")
@EnableKafka
public class KafkaTestConsumerConfig { …Run Code Online (Sandbox Code Playgroud) java apache-kafka spring-boot spring-kafka spring-kafka-test
我使用 EmbededKafka 实现了一系列集成测试,以测试使用 spring-kafka 框架运行的一个 Kafka 流应用程序。
流应用程序正在从 Kafka 主题读取消息,将其存储到内部状态存储中,进行一些转换并将其发送到另一个微服务到请求的主题中。当响应返回到响应主题时,它会从状态存储中检索原始消息,并根据某些业务逻辑将其转发到我们的下游系统之一,每个下游系统都有自己的主题。
集成测试只是测试业务条件的各种排列。
最初,测试被分成不同的班级。运行构建时,一个类中的测试与另一类中的测试发生冲突,并存在一些冲突异常。我没有花太多时间在这上面,只是将所有测试移到同一个类中。这解决了我从 gradle build 或从 intelij EDI 通过的所有测试的问题。
这是测试:
package au.nab.tlm.streams.integration;
import au.nab.tlm.streams.serde.EntitlementsCheckSerDes;
import au.nab.tlm.streams.test.support.MockEntitlementsCheckSerDes;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
@SpringBootTest
@ContextConfiguration(classes = {MyTopologiesIntegrationTest.TestKafkaConfig.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(
ports = 9092,
partitions = 1,
topics = {
"topic-1.v1",
"topic-2.v1",
"topic-3.v1",
"topic-4.v1",
"topic-5.v1",
"topic-6.v1",
},
brokerProperties = {"transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1", "log.dir=/tmp/embedded-kafka"}
)
public class MyTopologiesIntegrationTest {
@Autowired
EmbeddedKafkaBroker kafkaBroker; …Run Code Online (Sandbox Code Playgroud) 我创建了以下测试类来使用 AvroSerializer 生成事件。
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
@TestPropertySource(locations = ("classpath:application-test.properties"))
@ContextConfiguration(classes = { TestAppConfig.class })
@DirtiesContext
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EntitlementEventsConsumerServiceImplTest {
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Bean
MockSchemaRegistryClient mockSchemaRegistryClient() {
return new MockSchemaRegistryClient();
}
@Bean
KafkaAvroSerializer kafkaAvroSerializer() {
return new KafkaAvroSerializer(mockSchemaRegistryClient());
}
@Bean
public DefaultKafkaProducerFactory producerFactory() {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafkaBroker);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
return new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer());
}
@Bean
public KafkaTemplate<String, ApplicationEvent> kafkaTemplate() {
KafkaTemplate<String, ApplicationEvent> kafkaTemplate = new KafkaTemplate(producerFactory());
return kafkaTemplate;
}
} …Run Code Online (Sandbox Code Playgroud) java apache-kafka spring-kafka confluent-schema-registry spring-kafka-test
我使用 @EmbeddedKafka 进行了多个集成测试,在迁移到较新的 springboot 版本 2.1.8.RELEASE 后,日志中充满了这些堆栈跟踪。知道是什么原因造成的吗?
javax.management.InstanceAlreadyExistsException: kafka.server:type=app-info,id=0
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at kafka.server.KafkaServer.startup(KafkaServer.scala:321)
at kafka.utils.TestUtils$.createServer(TestUtils.scala:132)
at kafka.utils.TestUtils.createServer(TestUtils.scala)
at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:223)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1837)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1774)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:405)
Run Code Online (Sandbox Code Playgroud) spring-kafka ×8
apache-kafka ×6
java ×3
spring-boot ×3
spring ×2
kafka-topic ×1
unit-testing ×1