标签: spring-kafka

SpringBoot/spring-kafka 应用程序中的 Autowired KafkaTemplate 抛出空指针

我正在尝试使用Spring Boot 应用程序中的spring-kafka KafkaTemplate 将消息写入 Kafka 主题。

我创建了一个 KafkaConfig 类:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.broker.address}")
    private String brokerAddress;

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return props;
    }

    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<Integer, String>(producerFactory());
    }

}
Run Code Online (Sandbox Code Playgroud)

...并在我给 Kafka 写信的班级中自动连接了 KafkaTemplate:

@Autowired
private KafkaTemplate<Integer, String> …
Run Code Online (Sandbox Code Playgroud)

java spring spring-boot spring-kafka

1
推荐指数
1
解决办法
8784
查看次数

spring-kafka - 如何从头开始阅读一个主题,同时从头开始阅读另一个主题?

我正在写一个spring-kafka应用程序,我需要阅读2个主题:test1和test2:

public class Receiver {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(Receiver.class);

    @KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "test1", partitions = { "0" }),
  @TopicPartition(topic = "test2", partitions = { "0" })})
    public void receiveMessage(String message) {
        LOGGER.info("received message='{}'", message);
    }
}
Run Code Online (Sandbox Code Playgroud)

我的配置如下所示:

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Value("${kafka.bootstrap.servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections
        // …
Run Code Online (Sandbox Code Playgroud)

spring-boot spring-kafka

1
推荐指数
2
解决办法
4765
查看次数

无效的 UTF-8 中间字节 0x72

我在 spring-kafka 中使用 JsonSerializer 和 JsonDeserializer 在生成消息时设置值序列化器。该消息有一个字段(orgName),其中包含一个特殊字符(德语变音)。我该如何处理这个特殊字符?我知道 JsonDeserializer 使用 jackson 并且 jackson 支持 utf-8。JsonDeserializer 因为它而抛出这个错误:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Invalid UTF-8 middle byte 0x72
 at [Source: [B@403d4534; line: 1, column: 128]
 at [Source: [B@403d4534; line: 1, column: 116] (through reference chain: com.abc-company.kafka.JobRequest["orgName"])
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:388) ~[jackson-databind-2.8.5.jar:2.8.5]
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:348) ~[jackson-databind-2.8.5.jar:2.8.5]
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1599) ~[jackson-databind-2.8.5.jar:2.8.5]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:359) ~[jackson-databind-2.8.5.jar:2.8.5]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:148) ~[jackson-databind-2.8.5.jar:2.8.5]
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1626) ~[jackson-databind-2.8.5.jar:2.8.5]
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1237) ~[jackson-databind-2.8.5.jar:2.8.5]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:86) ~[spring-kafka-1.1.2.RELEASE.jar:na]
Run Code Online (Sandbox Code Playgroud)

jackson spring-kafka

1
推荐指数
1
解决办法
1万
查看次数

将多个主题名称与 KafkaListener 注释一起使用

下面有一个类似的问题:


单个 Spring 的 KafkaConsumer 侦听器可以收听多个主题吗?

所以我现在明白我可以为 KafkaListener 注释的主题参数提供一个字符串数组,但是我想知道以下内容:

  1. 如何从属性文件中获取主题名称作为字符串数组?
  2. 这个来自多个主题的阅读如何影响偏移量?客户端(spring kafka)会维护每个主题的偏移量吗?

apache-kafka spring-kafka

1
推荐指数
1
解决办法
2554
查看次数

Spring Kafka Consumer - 打印 Kafka 滞后信息

我创建了一个从主题读取的 spring kafka 消费者。有没有办法打印类似于我们打印分区信息的滞后信息?

java kafka-consumer-api spring-kafka

1
推荐指数
1
解决办法
2496
查看次数

即使我在生产者配置中指定了压缩类型,kafka 代理也没有压缩我更大尺寸的消息

下面是我的生产者配置,如果您看到它们的压缩类型为 gzip ,即使我提到了压缩类型,为什么消息没有发布并且它失败了

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, edi856KafkaConfig.getBootstrapServersConfig());
        props.put(ProducerConfig.RETRIES_CONFIG, edi856KafkaConfig.getRetriesConfig());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, edi856KafkaConfig.getBatchSizeConfig());
        props.put(ProducerConfig.LINGER_MS_CONFIG, edi856KafkaConfig.getIntegerMsConfig());
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, edi856KafkaConfig.getBufferMemoryConfig());
        ***props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");***
        props.put(Edi856KafkaProducerConstants.SSL_PROTOCOL, edi856KafkaConfig.getSslProtocol());
        props.put(Edi856KafkaProducerConstants.SECURITY_PROTOCOL, edi856KafkaConfig.getSecurityProtocol());
        props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_LOCATION, edi856KafkaConfig.getSslKeystoreLocation());
        props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_PASSWORD, edi856KafkaConfig.getSslKeystorePassword());
        props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_LOCATION, edi856KafkaConfig.getSslTruststoreLocation());
        props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_PASSWORD, edi856KafkaConfig.getSslTruststorePassword());
        **props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");** 
Run Code Online (Sandbox Code Playgroud)

错误如下

org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-12-07_12:34:10.037 [http-nio-8080-exec-1] ERROR c.tgt.trans.producer.Edi856Producer - Exception while writing mesage to topic= '{}'
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes …
Run Code Online (Sandbox Code Playgroud)

jms apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

1
推荐指数
1
解决办法
1239
查看次数

如何增加每批Spring Kafka Consumer消耗的消息数?

我正在构建一个Kafka Consumer应用程序,该应用程序使用来自Kafka Topic的消息并执行数据库更新任务。每天都会大批量生产消息-因此,该主题在10分钟内加载了大约100万条消息。主题有8个分区。

Spring Kafka使用者(用@KafkaListener注释并使用ConcurrentKafkaListenerContainerFactory注释)在极短的时间内触发。

批处理大小有时仅为1或2条消息。如果它可以一次使用大约1000条消息并一起处理(例如,我可以在一个更新SQL中更新数据库),而不是为每条消息连接到数据库,则将有助于提高性能。

我已经尝试在工厂中减少并发,以避免多个线程消耗较少的消息。

我还将Kafka的server.properties中的socket.send.buffer.bytes属性从102400增加到1024000。

这些步骤没有增加批处理大小。

我还有其他配置可以用来增加消费者的浴室大小吗?

java apache-kafka spring-boot spring-kafka

1
推荐指数
1
解决办法
2407
查看次数

Spring Kafka lib 与原生 Kafka Java API 的区别

对于 Java/Kotlin Spring 启动应用程序,如果我想向 Kafka 发送消息或使用来自 Kafka 的消息。你会推荐使用 Spring Kafka 库还是只使用 Kafka Java API。

不太确定 Spring 是否提供更多好处或只是一个包装器?对于 Spring,它们提供了许多注释,当出现运行时错误时,这些注释看起来更神奇。

想听听一些意见。

apache-kafka spring-kafka

1
推荐指数
1
解决办法
1359
查看次数

如何定义 StreamsBuilderFactoryBean 的两个实例

我是 Spring Kafka 的新手。出于某种原因,我想创建两个 StreamsBuilderFactoryBean,如您所见,我定义了两个 StreamsBuilderFactoryBean,一个名为“ commonDSLBuilder”,另一个名为“ ”,propertyDSLBuilder带有props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4). 所以“ commonDSLBuilde”只创建一个消费者,而“ propertyDSLBuilder”创建四个消费者。

 @Configuration
@EnableKafka
public class KafkaStreamsConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);

    @Value("${spring.kafka.stream.application-id}")
    private String applicationId;

    @Bean(name = "commonDSLBuilder")
    public StreamsBuilderFactoryBean commonDSLBuilder() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        StreamsConfig streamsConfig = new StreamsConfig(props);
        StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
        streamsBuilder.setSingleton(Boolean.FALSE);
        return streamsBuilder;
    }

    @Bean(name = "propertyDSLBuilder")
    public StreamsBuilderFactoryBean propertyDSLBuilder() {
        Map<String, Object> …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
1848
查看次数

Spring Kafka-如何使用@KafkaListener重试

来自Twitter的问题:

只是尝试找出一个与KafkaListener和AckMode.MANUAL_IMMEDIATE一起使用的spring-kafka 2.1.7的简单示例,以重试上一个失败的消息。

https://twitter.com/tolbier/status/1028936942447149056

spring-boot spring-kafka

1
推荐指数
1
解决办法
5685
查看次数