嘿伙计们,我想在我的春季启动项目中使用Kafka Streams实时处理.所以我需要Kafka Streams配置或者我想使用KStreams或KTable,但我在互联网上找不到例子.
我现在做生产者和消费者我想实时流式传输.
我只想将 BackAccount 对象发送给生产者,并且我想使用该对象。我有生产者和消费者配置,但它给了我错误。
我的 KafkaProducer 配置:
@Configuration
public class KafkaProducerConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, BankAccount> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, BankAccount> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Run Code Online (Sandbox Code Playgroud)
卡夫卡消费者配置:
@Configuration
public class KafkaConsumerConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, …Run Code Online (Sandbox Code Playgroud) 我将 MultiPartFile 作为参数传递给我的函数并使用 CSVFormat 读取,但它给了我空指针异常,这是我的代码。
我想从文件中读取并将该值分配给我的对象。我的 CSV 文件是这样的:
发件人帐户、收件人帐户、金额、日期
123,654321,100,19-07-2018 12:13:00
public List<BankAccount> readCsv(MultipartFile file) throws IOException {
List<BankAccount> bankAccountList = new ArrayList<BankAccount>();
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss");
Reader in = new InputStreamReader(BankAccount.class.getClassLoader()
.getResourceAsStream(file.getOriginalFilename()));
Iterable<CSVRecord> parser = CSVFormat.EXCEL.withHeader("Sender account","Receiver Account","Amount","Date").parse(in);
for (CSVRecord csvRecord : parser) {
BankAccount bankAccount = new BankAccount();
String senderAccount = csvRecord.get("Sender account");
String receiverAccount = csvRecord.get("Receiver account");
float amount = Float.parseFloat(csvRecord.get("Amount"));
ZonedDateTime date = LocalDateTime.parse(csvRecord.get("Date"), dtf)
.atZone(ZoneId.of("Asia/Istanbul"));
bankAccount.setFromId(senderAccount);
bankAccount.setToId(receiverAccount);
bankAccount.setDate(date);
bankAccount.setBalance(amount);
bankAccountList.add(bankAccount);
}
return bankAccountList;
}
Run Code Online (Sandbox Code Playgroud)