小编Mac*_*ież的帖子

如何在使用Jackson反序列化OffsetDateTime时保留偏移量

在传入的JSON中,我有一个符合ISO8601标准的日期时间字段,包含区域偏移量.我想保留这个偏移量,但不幸的是Jackson在反序列化这个字段时默认为GMT/UTC(我从http://wiki.fasterxml.com/JacksonFAQDateHandling中了解到).

@RunWith(JUnit4.class)
public class JacksonOffsetDateTimeTest {

    private ObjectMapper objectMapper;

    @Before
    public void init() {
        objectMapper = Jackson2ObjectMapperBuilder.json()
            .modules(new JavaTimeModule())
            .featuresToDisable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
            .build();
    }

    @Test
    public void test() throws IOException {
        final String json = "{ \"date\": \"2000-01-01T12:00:00.000-04:00\" }";
        final JsonType instance = objectMapper.readValue(json, JsonType.class);

        assertEquals(ZoneOffset.ofHours(-4), instance.getDate().getOffset());
    }
}


public class JsonType {
    private OffsetDateTime date;

    // getter, setter
}
Run Code Online (Sandbox Code Playgroud)

我在这里得到的是:

java.lang.AssertionError: expected:<-04:00> but was:<Z>
Run Code Online (Sandbox Code Playgroud)

如何使返回的OffsetDateTime包含原始偏移量?

我在Jackson 2.8.3上.

java json jackson java-time

7
推荐指数
1
解决办法
3635
查看次数

使用spring-kafka保证消息顺序的指数退避

我正在尝试实现一个基于Spring Boot的Kafka消费者,它具有一些非常强大的消息传递保证,即使在出现错误的情况下也是如此.

  • 必须按顺序处理来自分区的消息,
  • 如果消息处理失败,应暂停特定分区的消耗,
  • 应该通过退避重试处理,直到成功为止.

我们当前的实施满足以下要求:

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate());

    final ContainerProperties containerProperties = factory.getContainerProperties();
    containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
    containerProperties.setErrorHandler(errorHandler());

    return factory;
  }

  @Bean
  public RetryTemplate retryTemplate() {

    final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);
    backOffPolicy.setMultiplier(1.5);

    final RetryTemplate template = new RetryTemplate();
    template.setRetryPolicy(new AlwaysRetryPolicy());    
    template.setBackOffPolicy(backOffPolicy);

    return template;
  }

  @Bean
  public ErrorHandler errorHandler() {
    return new SeekToCurrentErrorHandler();
  }
Run Code Online (Sandbox Code Playgroud)

但是,在这里,记录永远被消费者锁定.在某些时候,处理时间将超过max.poll.interval.ms,服务器将分区重新分配给其他消费者,从而创建一个副本.

假设max.poll.interval.ms等于5分钟(默认)并且故障持续30分钟,这将导致消息被处理ca. 6次.

另一种可能性是在N次重试(例如3次尝试)之后通过使用将消息返回到队列SimpleRetryPolicy.然后,将重播该消息(感谢SeekToCurrentErrorHandler)并且处理将从头开始,最多再次尝试5次.这导致延迟形成一系列例如

10 secs …
Run Code Online (Sandbox Code Playgroud)

messaging apache-kafka spring-retry spring-kafka exponential-backoff

7
推荐指数
1
解决办法
4565
查看次数