小编Bac*_*hrc的帖子

Spring Kafka - 如何使用组ID将偏移重置为最新?

我目前正在使用Spring Integration Kafka进行实时统计.但是,组名使Kafka搜索了侦听器未读取的所有先前值.

@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}
Run Code Online (Sandbox Code Playgroud)

我想开始最新的偏移,而不是被旧的价值所困扰.是否有可能重置组的偏移量?

java spring spring-integration apache-kafka spring-kafka

7
推荐指数
2
解决办法
7998
查看次数

Flink 中是否可以使用多个 worker 的全局状态?

在 Flink 文档的任何地方,我都看到状态对于地图功能和工作人员来说是独立的。这在独立的方法中似乎很强大,但是如果 Flink 在集群中运行呢?Flink 可以处理所有工作人员都可以添加数据并查询的全局状态吗?

来自 Flink 关于状态的文章:

为了在此设置中实现高吞吐量和低延迟,必须最小化任务之间的网络通信。在 Flink 中,流处理的网络通信只发生在作业算子图中的逻辑边上(垂直),因此流数据可以从上游传输到下游算子。

但是,操作符的并行实例之间没有通信(水平)。为了避免这种网络通信,数据局部性是 Flink 中的一个关键原则,它强烈影响状态的存储和访问方式。

apache-flink

5
推荐指数
1
解决办法
2649
查看次数

如何使用 Django Channels 进行多线程 AsyncConsumer

我已经使用 Django Channels 工作了一个星期了,并行性让我有些烦恼runworker

例如,我有一个 MQTT 客户端,它在收到消息时在通道中发布,基本。

async def treat_message(msg):
    channel_layer = get_channel_layer()
    payload = json.loads(msg.payload, encoding="utf-8")

    await channel_layer.send("mqtt", {
        "type": "value.change",
        "message": payload
    })
Run Code Online (Sandbox Code Playgroud)

这个送的很好啊 我想发送多少就发送多少,它会发送到redis队列中。到频道mqtt

然后我运行工作程序,它将重定向队列中的消息mqtt

python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']
Run Code Online (Sandbox Code Playgroud)

这就是问题开始的地方。以下是 AsyncConsumer 读取数据的内容:

class MQTTConsumer(AsyncConsumer):
    async def value_change(self, event):
        await asyncio.sleep(5)
        print("I received changes : {}".format(event["message"]))
Run Code Online (Sandbox Code Playgroud)

我睡觉是为了模拟任务的业务。这就是我要去的地方:异步消费者不是多线程的!当我向通道发送两条消息时,消费者需要 10 秒来处理第二条消息,而不是多线程时的 5 秒。如下所示。

2018-09-12 16:45:25,271 - INFO - runworker - Running worker …
Run Code Online (Sandbox Code Playgroud)

python django multithreading python-3.x django-channels

5
推荐指数
1
解决办法
2204
查看次数

在 Rust 中,如何将一个结构映射到另一个结构?

在其他语言(如 Java)中,库可用于将对象字段映射到另一个对象(如mapstruct)。它对于将控制器和服务彼此隔离确实很有用。

PersonDto personDto = mapper.businessToDto(personBusiness);
Run Code Online (Sandbox Code Playgroud)

我找不到如何用 Rust 来做到这一点?我没有发现任何图书馆可以帮助解决这个问题,也没有任何方法可以做到这一点。任何资源将不胜感激!

rust

4
推荐指数
1
解决办法
1162
查看次数