小编Alp*_*dız的帖子

Kafka Streams with Spring Boot

嘿伙计们,我想在我的春季启动项目中使用Kafka Streams实时处理.所以我需要Kafka Streams配置或者我想使用KStreams或KTable,但我在互联网上找不到例子.

我现在做生产者和消费者我想实时流式传输.

apache-kafka spring-boot apache-kafka-streams spring-kafka

8
推荐指数
1
解决办法
1万
查看次数

使用 Kafka/Spring Boot 发送对象

我只想将 BackAccount 对象发送给生产者,并且我想使用该对象。我有生产者和消费者配置,但它给了我错误。

我的 KafkaProducer 配置:

@Configuration
public class KafkaProducerConfig {


    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, BankAccount> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, BankAccount> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }



}
Run Code Online (Sandbox Code Playgroud)

卡夫卡消费者配置:

@Configuration
public class KafkaConsumerConfig {


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot kafka-consumer-api

5
推荐指数
0
解决办法
1万
查看次数

如何在 Java 中将 MultiPartfile 提供给 Reader?

我将 MultiPartFile 作为参数传递给我的函数并使用 CSVFormat 读取,但它给了我空指针异常,这是我的代码。

我想从文件中读取并将该值分配给我的对象。我的 CSV 文件是这样的:

发件人帐户、收件人帐户、金额、日期

123,654321,100,19-07-2018 12:13:00

public List<BankAccount> readCsv(MultipartFile file) throws IOException {
        List<BankAccount> bankAccountList = new ArrayList<BankAccount>();

        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss");



        Reader in = new InputStreamReader(BankAccount.class.getClassLoader()
                .getResourceAsStream(file.getOriginalFilename()));

        Iterable<CSVRecord> parser = CSVFormat.EXCEL.withHeader("Sender account","Receiver Account","Amount","Date").parse(in);


            for (CSVRecord csvRecord : parser) {
            BankAccount bankAccount = new BankAccount();
            String senderAccount = csvRecord.get("Sender account");
            String receiverAccount = csvRecord.get("Receiver account");
            float amount = Float.parseFloat(csvRecord.get("Amount"));
            ZonedDateTime date = LocalDateTime.parse(csvRecord.get("Date"), dtf)
                    .atZone(ZoneId.of("Asia/Istanbul"));
            bankAccount.setFromId(senderAccount);
            bankAccount.setToId(receiverAccount);
            bankAccount.setDate(date);
            bankAccount.setBalance(amount);
            bankAccountList.add(bankAccount);
        }

        return bankAccountList;
    }
Run Code Online (Sandbox Code Playgroud)

java csv spring spring-boot

3
推荐指数
2
解决办法
7514
查看次数