我目前正在使用Spring Integration Kafka进行实时统计.但是,组名使Kafka搜索了侦听器未读取的所有先前值.
@Value("${kafka.consumer.group.id}")
private String consumerGroupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}
public Map<String, Object> getDefaultProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return properties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaMessageListener listener() {
return new KafkaMessageListener();
}
Run Code Online (Sandbox Code Playgroud)
我想开始最新的偏移,而不是被旧的价值所困扰.是否有可能重置组的偏移量?
在 Flink 文档的任何地方,我都看到状态对于地图功能和工作人员来说是独立的。这在独立的方法中似乎很强大,但是如果 Flink 在集群中运行呢?Flink 可以处理所有工作人员都可以添加数据并查询的全局状态吗?
来自 Flink 关于状态的文章:
为了在此设置中实现高吞吐量和低延迟,必须最小化任务之间的网络通信。在 Flink 中,流处理的网络通信只发生在作业算子图中的逻辑边上(垂直),因此流数据可以从上游传输到下游算子。
但是,操作符的并行实例之间没有通信(水平)。为了避免这种网络通信,数据局部性是 Flink 中的一个关键原则,它强烈影响状态的存储和访问方式。
我已经使用 Django Channels 工作了一个星期了,并行性让我有些烦恼runworker。
例如,我有一个 MQTT 客户端,它在收到消息时在通道中发布,基本。
async def treat_message(msg):
channel_layer = get_channel_layer()
payload = json.loads(msg.payload, encoding="utf-8")
await channel_layer.send("mqtt", {
"type": "value.change",
"message": payload
})
Run Code Online (Sandbox Code Playgroud)
这个送的很好啊 我想发送多少就发送多少,它会发送到redis队列中。到频道mqtt。
然后我运行工作程序,它将重定向队列中的消息mqtt:
python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']
Run Code Online (Sandbox Code Playgroud)
这就是问题开始的地方。以下是 AsyncConsumer 读取数据的内容:
class MQTTConsumer(AsyncConsumer):
async def value_change(self, event):
await asyncio.sleep(5)
print("I received changes : {}".format(event["message"]))
Run Code Online (Sandbox Code Playgroud)
我睡觉是为了模拟任务的业务。这就是我要去的地方:异步消费者不是多线程的!当我向通道发送两条消息时,消费者需要 10 秒来处理第二条消息,而不是多线程时的 5 秒。如下所示。
2018-09-12 16:45:25,271 - INFO - runworker - Running worker …Run Code Online (Sandbox Code Playgroud) 在其他语言(如 Java)中,库可用于将对象字段映射到另一个对象(如mapstruct)。它对于将控制器和服务彼此隔离确实很有用。
PersonDto personDto = mapper.businessToDto(personBusiness);
Run Code Online (Sandbox Code Playgroud)
我找不到如何用 Rust 来做到这一点?我没有发现任何图书馆可以帮助解决这个问题,也没有任何方法可以做到这一点。任何资源将不胜感激!
apache-flink ×1
apache-kafka ×1
django ×1
java ×1
python ×1
python-3.x ×1
rust ×1
spring ×1
spring-kafka ×1