我在 Spring Boot 中有卡夫卡处理程序:
@KafkaListener(topics = "topic-one", groupId = "response")
public void listen(String response) {
myService.processResponse(response);
}
Run Code Online (Sandbox Code Playgroud)
例如,生产者每秒发送一条消息。但myService.processResponse工作10秒。我需要处理每条消息并myService.processResponse在新线程中开始。我可以创建我的执行者并将每个响应委托给它。但我认为 kafka 中还有其他配置可供使用。我找到了2个:
1)添加concurrency = "5"到@KafkaListener注释 - 它似乎有效。但我不确定有多正确,因为我有第二种方法:
2)我可以创建ConcurrentKafkaListenerContainerFactory并设置它ConsumerFactory并且concurrency
我不明白这些方法之间的区别?concurrency = "5"只需添加到注释就足够了@KafkaListener还是我需要创建ConcurrentKafkaListenerContainerFactory?
或者我根本不明白什么,还有其他方法吗?
java apache-kafka spring-boot kafka-consumer-api spring-kafka
我正在使用 spring kafka 开发一个 Spring boot 应用程序,该应用程序侦听 kafka 的单个主题,然后隔离各个类别的记录,从中创建一个 json 文件并将其上传到 AWS S3。
我在 Kafka 主题中收到大量数据,我需要确保 json 文件分块得足够大,以限制上传到 S3 的 json 数量。
以下是我application.yml对 kafka 消费者的配置。
spring:
kafka:
consumer:
group-id: newton
auto-offset-reset: earliest
fetch-max-wait:
seconds: 1
fetch-min-size: 500000000
max-poll-records: 50000000
value-deserializer: com.forwarding.application.consumer.model.deserializer.MeasureDeserializer
Run Code Online (Sandbox Code Playgroud)
我创建了一个监听器来连续阅读该主题。
即使使用上述配置,我在控制台中收到的记录如下:
2019-03-27T15:25:56.02+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.024 INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl : Time taken(ms) 56. No Of measures: 60
2019-03-27T15:25:56.21+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.210 INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl : Time taken(ms) 80. No Of measures: …Run Code Online (Sandbox Code Playgroud) 我试图按照https://github.com/spring-projects/spring-kafka/issues/361将主题名称从 .yml 文件传递到 @kafkalistener。但编译器抛出以下错误
Type mismatch.
Required:
Array<String>
Found:
String
Unresolved reference: spring
Run Code Online (Sandbox Code Playgroud)
下面是接收者代码
@Component
class Receiver {
companion object {
private val LOGGER = LoggerFactory.getLogger(Receiver::class.java)
}
@Autowired
private val taskExecutor: TaskExecutor? = null
@Autowired
private val applicationContext: ApplicationContext? = null
@KafkaListener(topics = "#{'${spring.kafka.topics}'.split(',')}")
fun receive(@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String) {
}
}
Run Code Online (Sandbox Code Playgroud)
下面是我的 build.gradle 文件
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.1.7.RELEASE"
id("io.spring.dependency-management") version "1.0.8.RELEASE"
kotlin("jvm") version "1.2.71"
kotlin("plugin.spring") version "1.2.71"
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility …Run Code Online (Sandbox Code Playgroud) 是否可以将 Kafka 配置为在单个 Spring Boot 应用程序中使用两个独立的集群?
用例:我有两个带有副本+zookeeper的集群:
server1.example.com,server2.example.com,server3.example.comtarget-server1.example.com,target-server2.example.com,target-server3.example.com我需要使用该消息,Cluster #1然后根据该数据进行一些计算,并生成Cluster #2主题结果。有没有办法在单个 Spring 应用程序中配置 Kafka 来处理这种方法?
我创建了一组带有嵌入式 kafka (spring-kafka-test) 的测试 (JUnit 5),当我有时(并非总是)运行它们时,我在单次运行的一个或多个测试中得到“主题 'some_name' 已存在”。
所有测试都使用相同的主题名称(我不想为每个测试更改该名称),测试类具有 DirtiesContext 注释(AFTER_EACH_TEST_METHOD)。我不确定这个问题的原因是什么,以及如何解决它。
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@ActiveProfiles("test")
public class RemovalKafkaTestIT {
private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
private final static String SERVER_ADDRES = "127.0.0.1:9092";
private Consumer<String, String> prepareConsumer() {
Map<String, Object> configsConsumer = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
configsConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configsConsumer.put("bootstrap.servers", SERVER_ADDRES);
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configsConsumer, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(singleton("some_name"));
return consumer;
}
@Test
public void …Run Code Online (Sandbox Code Playgroud) 在kafka-client 2.1.0中,client.dns.lookup可用。以下是每个选项的说明。
use_all_dns_ips
当查找返回主机名的多个 IP 地址时,在连接失败之前将尝试连接所有这些 IP 地址
仅限resolve_canonical_bootstrap_servers_only
每个条目都将被解析并扩展为规范名称列表
他们不是都使用dns吗?use_all_dns_ips和resolve_canonical_bootstrap_servers_only之间有什么区别?
apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
我正在发送一个 avro 对象 User 并在侦听器中接收它。这是配置
@Bean
public ProducerFactory<String, User> userProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
return new KafkaTemplate<>(userProducerFactory());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro");
return props;
}
@Bean
public ConsumerFactory<String, User> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new AvroDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() { …Run Code Online (Sandbox Code Playgroud) 几天前我问了一个关于存根 kafka.send() 方法的未来响应的问题。@kriegaex在这里正确回答并解释了这一点\n虽然我遇到了另一个问题,但我如何测试这个未来响应的 onSuccess 和 onFailure 回调。这是正在测试的代码。
\n\nimport org.springframework.kafka.core.KafkaTemplate;\nimport org.springframework.kafka.support.SendResult;\nimport org.springframework.util.concurrent.ListenableFuture;\nimport org.springframework.util.concurrent.ListenableFutureCallback;\n\npublic class KakfaService {\n\n private final KafkaTemplate<String, String> kafkaTemplate;\n private final LogService logService;\n\n public KakfaService(KafkaTemplate kafkaTemplate, LogService logService){\n this.kafkaTemplate = kafkaTemplate;\n this.logService = logService;\n }\n\n public void sendMessage(String topicName, String message) {\n ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);\n future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {\n\n @Override\n public void onSuccess(SendResult<String, String> result) {\n LogDto logDto = new LogDto();\n logDto.setStatus(StatusEnum.SUCCESS);\n logService.create(logDto)\n }\n @Override\n public void onFailure(Throwable ex) {\n LogDto logDto = …Run Code Online (Sandbox Code Playgroud) 我参考了此处发布的示例。我正在尝试一起运行多个 Spring Cloud Stream 应用程序。这里第一个的输出作为其他的输入。以下是我正在尝试做的事情。
@Bean
public Function<KStream<FormUUID, FormData>, KStream<UUID, Application>> process()
{
//do some processing here and return
}
// read output from above process and join it with an event stream
@Bean
public BiConsumer<KStream<UUID, ProcessEvent>, KTable<UUID, Application>> listen()
{
return (eventStream,appTable )-> eventStream
.join(appTable, (event, app) -> app).foreach((k, app) -> app.createQuote());
}
Run Code Online (Sandbox Code Playgroud)
application.yml 如下所示
spring.cloud:
function: process;listen
stream:
kafka.streams:
bindings:
process-in-0.consumer.application-id: form-aggregator
listen-in-0.consumer.application-id: event-processor
listen-in-1.consumer.application-id: event-processor
binder.configuration:
default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.key.default.type: com.xxx.datamapper.domain.FormUUID
spring.json.value.default.type: com.xxx.datamapper.domain.FormData
commit.interval.ms: 1000 …Run Code Online (Sandbox Code Playgroud) 我正在使用 Spring Boot 开始使用 Apache Kafka。我想实现以下事实:同一服务的两个实例以循环方式消耗同一主题的消息。
因此,实例 1 接收主题的第一条消息,实例 2 接收第二条消息,实例 1 接收第三条消息,依此类推。
这是我当前的配置:
spring:
kafka:
consumer:
bootstrap-servers: rommel.local:9092
group-id: core
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
producer:
bootstrap-servers: rommel.local:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
missing-topics-fatal: false
ack-mode: record
Run Code Online (Sandbox Code Playgroud)
这是我的听众:
@KafkaListener(topics = "new-topic", groupId = "core")
public void consume(String payload) {
System.out.println("Data received: " + payload);
}
Run Code Online (Sandbox Code Playgroud)
问题是,当我运行应用程序的两个实例时,只有一个实例收到该主题的消息。
我在日志中意识到了这一点:
Instance 1:
core: partitions assigned: [new-topic-0]
Run Code Online (Sandbox Code Playgroud)
Instance 2:
core: partitions assigned: []
Run Code Online (Sandbox Code Playgroud)
那么当我杀死实例1时。被new-topic-0分配给实例 2。
那么如何实现两个实例循环获取同一主题消息的场景呢?
谢谢!!
spring-kafka ×10
apache-kafka ×6
spring-boot ×5
java ×3
avro ×1
groovy ×1
junit5 ×1
kotlin ×1
spock ×1
unit-testing ×1