我正在尝试实现一个基于Spring Boot的Kafka消费者,它具有一些非常强大的消息传递保证,即使在出现错误的情况下也是如此.
我们当前的实施满足以下要求:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
final ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
containerProperties.setErrorHandler(errorHandler());
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(1.5);
final RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(new AlwaysRetryPolicy());
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler();
}
Run Code Online (Sandbox Code Playgroud)
但是,在这里,记录永远被消费者锁定.在某些时候,处理时间将超过max.poll.interval.ms,服务器将分区重新分配给其他消费者,从而创建一个副本.
假设max.poll.interval.ms等于5分钟(默认)并且故障持续30分钟,这将导致消息被处理ca. 6次.
另一种可能性是在N次重试(例如3次尝试)之后通过使用将消息返回到队列SimpleRetryPolicy.然后,将重播该消息(感谢SeekToCurrentErrorHandler)并且处理将从头开始,最多再次尝试5次.这导致延迟形成一系列例如
10 secs …Run Code Online (Sandbox Code Playgroud) messaging apache-kafka spring-retry spring-kafka exponential-backoff
在库更新之前的我的Spring Boot / Kafka应用程序中,我使用以下类org.telegram.telegrambots.api.objects.Update将消息发布到Kafka主题。现在,我使用以下内容org.telegram.telegrambots.meta.api.objects.Update。如您所见-它们具有不同的软件包。
重新启动应用程序后,我遇到了以下问题:
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all …Run Code Online (Sandbox Code Playgroud) 我有一个弹簧嵌入式kafka问题,我想用它来测试我的kafka发送器/接收器。
当我尝试使用以下方式运行测试时:
@RunWith(MockitoJUnitRunner.class)
@SpringBootTest
@DirtiesContext
public class myTestClass {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka =
new EmbeddedKafkaRule(1, true, RECEIVER_TOPIC);
@Test public void test() {
System.out.println("@Test");
}
}
Run Code Online (Sandbox Code Playgroud)
我收到一个错误:
java.io.IOException: Failed to load C:\Users\username\AppData\Local\Temp\kafka-8251150311475880576 during broker startup
...
15:29:33.135 [main] ERROR kafka.log.LogManager - Shutdown broker because none of the specified log dirs from C:\Users\username\AppData\Local\Temp\kafka-8251150311475880576 can be created or validated
Run Code Online (Sandbox Code Playgroud)
我确定作为用户我可以访问此目录,并且在运行测试时可以看到kafka在temp文件夹上创建了此类目录(空),但仍然无法正常工作。
这是我的pom配置:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.2.0.RC1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RC1</version>
<scope>test</scope>
</dependency>
Run Code Online (Sandbox Code Playgroud)
appliacation.properties:
kafka.bootstrap-servers= ${spring.embedded.kafka.brokers}
Run Code Online (Sandbox Code Playgroud)
有趣的是,我从互联网上克隆了一个用于测试目的的kafka嵌入式示例,它工作正常,但是当我将其应用于我的项目时,它却像上面那样崩溃了。
有什么建议我在做什么错?
是否有可能将声明式交易管理(通过@Transactional)与@KafkaListener注释方法一起使用?例如,我想使用它来为每个侦听器定义单独的 tx 超时。我的设置如下:
交易管理器:
@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
return new ChainedKafkaTransactionManager<>(
kafkaTransactionManager,
hibernateTransactionManager);
}
Run Code Online (Sandbox Code Playgroud)
卡夫卡监听器:
@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}
Run Code Online (Sandbox Code Playgroud)
问题是 - KafkaMessageListenerContainer 在调用此类方法之前创建自己的事务 - 它使用自己的 TransactionTemplate:
@Nullable
private TransactionTemplate determineTransactionTemplate() {
return this.transactionManager != null
? new TransactionTemplate(this.transactionManager)
: null;
}
Run Code Online (Sandbox Code Playgroud)
未使用 TransactionInterceptor。那么如何为具体的 @KafkaListener 方法设置特定的 tx 超时呢?
首先,我必须说我对 confluence 并不熟悉。
我正在关注本教程:https://www.confluence.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/,但我陷入了困境。
我无法为 Kafka 创建使用者,因为我收到错误:io.confluence.common.config.ConfigException:缺少没有默认值的所需配置“schema.registry.url”。
我在 yml 配置中找不到此架构属性。
Confluence 在本地运行:
$: confluent local start
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
schema-registry is already running. Try restarting if needed
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
Run Code Online (Sandbox Code Playgroud)
在 Spring 中设置用户主题后,从控制中心我看到了不同的模式:
{
"connect.name": "ksql.users",
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": …Run Code Online (Sandbox Code Playgroud) “我正在尝试将消息路由到 Kafka 中的死信主题,以防处理相应消息时出现任何失败。我已经为此功能设置了 SeektoCurrentErrorHandler 和 DeadLetterPublishingRecoverer。
消费者在执行此操作时抛出以下异常:
2020-08-07 12:09:38.841 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='a6558a22-470d-4708-b297-814996a42045' and payload='{123, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 116, 101, 115, 116, 95, 101, 120, 1...' to topic test_execution.DLT and partition 2:
org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.
2020-08-07 12:09:38.846 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer : Dead-letter publication failed for: ProducerRecord(topic=test_execution.DLT, partition=2, headers=RecordHeaders(headers = [RecordHeader(key = …Run Code Online (Sandbox Code Playgroud) java apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例:https : //github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java
看起来反应式 kafka 中尚不支持 Spring
我了解 kafka 侦听器如何在 Spring 中的非反应式 kafka API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧
但是我现在不确定如何在 Spring 中正确使用反应式 kafka。
基本上我需要一个话题的听众。我应该创建某种循环或我自己的调度程序吗?或者也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?
java kafka-consumer-api reactive-kafka spring-kafka reactor-kafka
我们有一个使用 spring kafka 来读取消息的应用程序。应用程序的每个实例都必须有一个唯一的 groupId,并重置它并在重新启动时获取一个新的 groupId。GroupId 是通过 随机生成的${random.uuid}。
随机生成id的解决方案真的正确吗?
java apache-kafka spring-boot kafka-consumer-api spring-kafka
我正在研究Spring Boot 和 Apache Kafka - 尝试拥有用户定义的配置 -
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.demo.Consumer.consume(com.example.demo.User) throws java.io.IOException]
Bean [com.example.demo.Consumer@7cd4a8cc]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.demo.User] for GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}], failedMessage=GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: …Run Code Online (Sandbox Code Playgroud) 我有简单的 spring boot kafka 监听器。我想测试一下。
使用嵌入式 kafka 进行测试 VS 使用测试容器 kafka 进行测试有何缺点?我熟悉测试容器,它提供了完全功能性的kafka。嵌入式kafka相对于容器有什么局限性?(请具体例子)