我正在尝试使用Spring Boot 应用程序中的spring-kafka KafkaTemplate 将消息写入 Kafka 主题。
我创建了一个 KafkaConfig 类:
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.broker.address}")
private String brokerAddress;
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
}
Run Code Online (Sandbox Code Playgroud)
...并在我给 Kafka 写信的班级中自动连接了 KafkaTemplate:
@Autowired
private KafkaTemplate<Integer, String> …Run Code Online (Sandbox Code Playgroud) 我正在写一个spring-kafka应用程序,我需要阅读2个主题:test1和test2:
public class Receiver {
private static final Logger LOGGER = LoggerFactory
.getLogger(Receiver.class);
@KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "test1", partitions = { "0" }),
@TopicPartition(topic = "test2", partitions = { "0" })})
public void receiveMessage(String message) {
LOGGER.info("received message='{}'", message);
}
}
Run Code Online (Sandbox Code Playgroud)
我的配置如下所示:
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// …Run Code Online (Sandbox Code Playgroud) 我在 spring-kafka 中使用 JsonSerializer 和 JsonDeserializer 在生成消息时设置值序列化器。该消息有一个字段(orgName),其中包含一个特殊字符(德语变音)。我该如何处理这个特殊字符?我知道 JsonDeserializer 使用 jackson 并且 jackson 支持 utf-8。JsonDeserializer 因为它而抛出这个错误:
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Invalid UTF-8 middle byte 0x72
at [Source: [B@403d4534; line: 1, column: 128]
at [Source: [B@403d4534; line: 1, column: 116] (through reference chain: com.abc-company.kafka.JobRequest["orgName"])
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:388) ~[jackson-databind-2.8.5.jar:2.8.5]
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:348) ~[jackson-databind-2.8.5.jar:2.8.5]
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1599) ~[jackson-databind-2.8.5.jar:2.8.5]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:359) ~[jackson-databind-2.8.5.jar:2.8.5]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:148) ~[jackson-databind-2.8.5.jar:2.8.5]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1626) ~[jackson-databind-2.8.5.jar:2.8.5]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1237) ~[jackson-databind-2.8.5.jar:2.8.5]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:86) ~[spring-kafka-1.1.2.RELEASE.jar:na]
Run Code Online (Sandbox Code Playgroud) 下面有一个类似的问题:
单个 Spring 的 KafkaConsumer 侦听器可以收听多个主题吗?
所以我现在明白我可以为 KafkaListener 注释的主题参数提供一个字符串数组,但是我想知道以下内容:
我创建了一个从主题读取的 spring kafka 消费者。有没有办法打印类似于我们打印分区信息的滞后信息?
下面是我的生产者配置,如果您看到它们的压缩类型为 gzip ,即使我提到了压缩类型,为什么消息没有发布并且它失败了
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, edi856KafkaConfig.getBootstrapServersConfig());
props.put(ProducerConfig.RETRIES_CONFIG, edi856KafkaConfig.getRetriesConfig());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, edi856KafkaConfig.getBatchSizeConfig());
props.put(ProducerConfig.LINGER_MS_CONFIG, edi856KafkaConfig.getIntegerMsConfig());
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, edi856KafkaConfig.getBufferMemoryConfig());
***props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");***
props.put(Edi856KafkaProducerConstants.SSL_PROTOCOL, edi856KafkaConfig.getSslProtocol());
props.put(Edi856KafkaProducerConstants.SECURITY_PROTOCOL, edi856KafkaConfig.getSecurityProtocol());
props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_LOCATION, edi856KafkaConfig.getSslKeystoreLocation());
props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_PASSWORD, edi856KafkaConfig.getSslKeystorePassword());
props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_LOCATION, edi856KafkaConfig.getSslTruststoreLocation());
props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_PASSWORD, edi856KafkaConfig.getSslTruststorePassword());
**props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");**
Run Code Online (Sandbox Code Playgroud)
错误如下
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-12-07_12:34:10.037 [http-nio-8080-exec-1] ERROR c.tgt.trans.producer.Edi856Producer - Exception while writing mesage to topic= '{}'
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes …Run Code Online (Sandbox Code Playgroud) jms apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
我正在构建一个Kafka Consumer应用程序,该应用程序使用来自Kafka Topic的消息并执行数据库更新任务。每天都会大批量生产消息-因此,该主题在10分钟内加载了大约100万条消息。主题有8个分区。
Spring Kafka使用者(用@KafkaListener注释并使用ConcurrentKafkaListenerContainerFactory注释)在极短的时间内触发。
批处理大小有时仅为1或2条消息。如果它可以一次使用大约1000条消息并一起处理(例如,我可以在一个更新SQL中更新数据库),而不是为每条消息连接到数据库,则将有助于提高性能。
我已经尝试在工厂中减少并发,以避免多个线程消耗较少的消息。
我还将Kafka的server.properties中的socket.send.buffer.bytes属性从102400增加到1024000。
这些步骤没有增加批处理大小。
我还有其他配置可以用来增加消费者的浴室大小吗?
对于 Java/Kotlin Spring 启动应用程序,如果我想向 Kafka 发送消息或使用来自 Kafka 的消息。你会推荐使用 Spring Kafka 库还是只使用 Kafka Java API。
不太确定 Spring 是否提供更多好处或只是一个包装器?对于 Spring,它们提供了许多注释,当出现运行时错误时,这些注释看起来更神奇。
想听听一些意见。
我是 Spring Kafka 的新手。出于某种原因,我想创建两个 StreamsBuilderFactoryBean,如您所见,我定义了两个 StreamsBuilderFactoryBean,一个名为“ commonDSLBuilder”,另一个名为“ ”,propertyDSLBuilder带有props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4). 所以“ commonDSLBuilde”只创建一个消费者,而“ propertyDSLBuilder”创建四个消费者。
@Configuration
@EnableKafka
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
@Value("${spring.kafka.stream.application-id}")
private String applicationId;
@Bean(name = "commonDSLBuilder")
public StreamsBuilderFactoryBean commonDSLBuilder() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsConfig streamsConfig = new StreamsConfig(props);
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilder.setSingleton(Boolean.FALSE);
return streamsBuilder;
}
@Bean(name = "propertyDSLBuilder")
public StreamsBuilderFactoryBean propertyDSLBuilder() {
Map<String, Object> …Run Code Online (Sandbox Code Playgroud) 来自Twitter的问题:
只是尝试找出一个与KafkaListener和AckMode.MANUAL_IMMEDIATE一起使用的spring-kafka 2.1.7的简单示例,以重试上一个失败的消息。