我正在使用10g,我正在尝试进行一些简单的计算,然后将结果保存在一列中.实际的表有更多列,但以下是我在查询中使用的内容:
CREATE TABLE "VACCINE_LOT"
(
"VACCINE_LOT_ID" NUMBER(10,0) NOT NULL ENABLE,
"DOSE" NUMBER(6,3),
"QUANTITY_ON_HAND" NUMBER(12,2) NOT NULL ENABLE
)
CREATE TABLE "IMMUNIZATION"
(
"VACCINE_LOT_ID" NUMBER(10,0),
"DOSE_MAGNITUDE" NUMBER(4,2)
)
CREATE TABLE "VACCINE_LOT_TRANSACTION"
(
"VACCINE_LOT_ID" NUMBER(10,0) NOT NULL ENABLE,
"QUANTITY" NUMBER(12,2) NOT NULL ENABLE
)
INSERT INTO vaccine_lot VALUES (100, 0.2, 120);
INSERT INTO immunization VALUES (100, 0.2);
INSERT INTO immunization VALUES (100, 0.3);
INSERT INTO vaccine_lot_transaction VALUES (100, 150);
Run Code Online (Sandbox Code Playgroud)
免疫注射取自疫苗批次.'Dose_magnitude'是一个特定的免疫注射使用了多少.vaccine_lot中的'Dose'列显示了标准免疫注射的使用量.所以标准镜头可能是0.1cc.但是一次免疫射击实际上可能使用0.2cc甚至0.05cc.vaccine_lot_transaction中的"数量"列最初记录疫苗批次包含的标准免疫注射数量.
我在这里要做的是为疫苗批次计算正确的'Quantity_on_hand'(也就是说,疫苗批次还剩下多少标准免疫注射).
以下是使用我们刚刚插入的数据的示例.我们有一个疫苗批次(批次ID为'100'),它开始时有150个标准镜头(也就是说,它包含150个0.2cc镜头).这批次已有两次免疫注射,一次0.2cc,另一次0.3cc).目前120的数量显然是错误的,我们需要重新计算并更新它.
这是我的查询:
UPDATE vaccine_lot V SET quantity_on_hand =
(
(
(SELECT …
Run Code Online (Sandbox Code Playgroud) 我在 Spring Boot 中配置了几个 Kafka 消费者。这就是 kafka.properties 的样子(这里只列出一个消费者的配置):
kafka.topics=
bootstrap.servers=
group.id=
enable.auto.commit=
auto.commit.interval.ms=
session.timeout.ms=
schema.registry.url=
auto.offset.reset=
kafka.enabled=
Run Code Online (Sandbox Code Playgroud)
这是配置:
@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {
@Autowired
private Environment env;
@Bean
public ConsumerFactory<String, String> pindropConsumerFactory() {
Map<String, Object> dataRiverProps = new HashMap<>();
dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));
dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));
return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(pindropConsumerFactory());
return factory;
}
}
Run Code Online (Sandbox Code Playgroud)
这是消费者:
@Component
public …
Run Code Online (Sandbox Code Playgroud) 我使用Spring Boot和Spring Kafka设置了我的项目,并且有三个使用者。查看日志,可以发现消费者不时断开连接:
catalina.out:2019-04-27 02:19:57.962 INFO 18245 --- [ntainer#2-0-C-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-2, groupId=FalconDataRiver1] Error sending fetch request (sessionId=1338157432, epoch=205630) to node 101: org.apache.kafka.common.errors.DisconnectException.
catalina.out:2019-04-27 02:19:57.962 INFO 18245 --- [ntainer#4-0-C-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-6, groupId=FalconDataRiver1] Error sending fetch request (sessionId=727942178, epoch=234691) to node 101: org.apache.kafka.common.errors.DisconnectException.
catalina.out:2019-04-27 02:19:57.962 INFO 18245 --- [ntainer#0-0-C-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-10, groupId=FalconDataRiver1] Error sending fetch request (sessionId=836405004, epoch=234351) to node 101: org.apache.kafka.common.errors.DisconnectException.
catalina.out:2019-04-27 02:19:58.023 INFO 18245 --- [ntainer#1-0-C-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-12, groupId=FalconDataRiver1] Error sending fetch …
Run Code Online (Sandbox Code Playgroud) 使用 Spring Boot,我试图以批量接收模式设置我的 Kafka 消费者:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter()); // I know this one won't work
factory.setBatchListener(true);
return factory;
}
@Bean
public ConsumerFactory<GenericData.Record, GenericData.Record> consumerFactory() {
Map<String, Object> dataRiverProps = getDataRiverProps();
dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}
Run Code Online (Sandbox Code Playgroud)
这就是实际消费者的样子:
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = 'kafkaListenerContainerFactory')
public void consumeAvro(List<GenericData.Record> list, Acknowledgment ack) {
messageProcessor.addMessageBatchToExecutor(list);
while (messageProcessor.getTaskSize() > EXECUTOR_TASK_COUNT_THRESHOLD) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER_ERROR.error(ExceptionUtils.getStackTrace(e.getCause()));
}
}
} …
Run Code Online (Sandbox Code Playgroud)