当有多个消费者时,我无法收听 kafka 主题(我的案例 2 主题)。在下面的示例中,我有 2 个消费者工厂,它们将接受 2 个不同的 JSON 消息(一个是用户类型,另一个是事件类型)。两条消息都发布到不同的主题。在这里,当我尝试访问 topic1 中的事件消息时,我无法访问,但可以访问用户主题消息。
前任:
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Autowired
private Environment environment;
@Bean
public ConsumerFactory<String,User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String , Event> consumerFactoryEvent(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers")); …Run Code Online (Sandbox Code Playgroud) 我试图找到2个字典列表之间的区别.我在这个论坛中找到了一些信息,但没有达到我的目的.
incoming_rows = [{'column_name': 'LOAD_ID', 'data_type': 'int', 'table_name': 'CONFIG'},
{'column_name': 'ROW_NUMBER', 'data_type': 'int', 'table_name': 'CONFIG'},
{'column_name': 'CREATE_DATE', 'data_type': 'VARCHAR(20)', 'table_name': 'CONFIG'},
{'column_name': 'CONFIG_TYPE', 'data_type': 'varchar(1)', 'table_name': 'CONFIG'},
{'column_name': 'CONFIG_ID', 'data_type': 'numeric(10,0)', 'table_name': 'CONFIG'}
]
available_row = [{'column_name': 'LOAD_ID', 'data_type': 'int', 'table_name': 'CONFIG'},
{'column_name': 'ROW_NUMBER', 'data_type': 'int', 'table_name': 'CONFIG'},
{'column_name': 'CREATE_DATE', 'data_type': 'date', 'table_name': 'CONFIG'}
]
Run Code Online (Sandbox Code Playgroud)
在这里,我需要将incoming_rows与available_row字典列表进行比较,差异想要列在另一个dict格式列表中.这里我的表名是唯一的.条件:1.任何新添加的列.2.数据类型的任何更改如果这两个条件为真,则expected_row应仅包含这些已更改的行.
# expected output
expected_row=[{'column_name': 'CONFIG_TYPE', 'data_type': 'varchar(1)', 'table_name': 'CONFIG'},
{'column_name': 'CONFIG_ID', 'data_type': 'numeric(10,0)', 'table_name': 'CONFIG'},
{'column_name': 'CREATE_DATE', 'data_type': 'VARCHAR(20)', 'table_name': 'CONFIG'}
]
Run Code Online (Sandbox Code Playgroud)