小编Gli*_*ide的帖子

即使没有提交偏移量,Consumer.poll()也会返回新记录?

如果我有一个enable.auto.commit=false和我打电话consumer.poll()而没有打电话consumer.commitAsync(),为什么consumer.poll()下次打电话时会返回新记录?

由于我没有提交我的偏移量,我希望poll()能够返回最新的偏移量,这应该是相同的记录.

我问,因为我在处理过程中试图处理故障情况.我希望不提交偏移量,poll()将再次返回相同的记录,以便我可以再次重新处理这些失败的记录.

public class MyConsumer implements Runnable {
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord record : records) {
                try {
                   //process record
                   consumer.commitAsync();
                } catch (Exception e) {
                }
                /**
                If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception. 
                **/
            }

        }
    }
}
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api

9
推荐指数
1
解决办法
2654
查看次数

KafkaAvroDeserializer不返回SpecificRecord但返回GenericRecord

KafkaProducer可以使用KafkaAvroSerializer序列化对象到我的主题.但是,KafkaConsumer.poll()返回反GenericRecord序列化而不是我的序列化类.

MyKafkaProducer

 KafkaProducer<CharSequence, MyBean> producer;
    try (InputStream props = Resources.getResource("producer.props").openStream()) {
      Properties properties = new Properties();
      properties.load(props);
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");

      MyBean bean = new MyBean();
      producer = new KafkaProducer<>(properties);
      producer.send(new ProducerRecord<>(topic, bean.getId(), bean));
Run Code Online (Sandbox Code Playgroud)

我的KafkaConsumer

 try (InputStream props = Resources.getResource("consumer.props").openStream()) {
      properties.load(props);
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");
      consumer = new KafkaConsumer<>(properties);
    }
    consumer.subscribe(Arrays.asList(topic));
    try {
      while (true) {
        ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
        if (records.isEmpty()) {
          continue;
        }
        for …
Run Code Online (Sandbox Code Playgroud)

java avro apache-kafka confluent confluent-schema-registry

8
推荐指数
1
解决办法
4124
查看次数

减少执行者核心数量会减少执行者内存吗?

我的Spark作业因YARN错误而失败Container killed by YARN for exceeding memory limits 10.0 GB of 10 GB physical memory used

凭直觉,我将核心数从减少51,并且作业成功完成。

我没有增加,executor-memory因为这10g是我的YARN群集的最大值。

我只是想确认我的直觉。减少executor-cores消费数量会减少executor-memory吗?如果是这样,为什么?

hadoop-yarn apache-spark

8
推荐指数
1
解决办法
189
查看次数

vim DelimitMate

使用DelimitMate,它会自动生成结束括号.当我在括号内完成输入时,我按哪些键击以快速转到右括号的右侧?(现在我必须手动按ESC然后'a')

vim

7
推荐指数
4
解决办法
4645
查看次数

XCode是Eclipse QuickFix/Intellij的Show Action Intetion的等效快捷方式

在Eclipse和的IntelliJ,当你有一个警告/错误,你可以做Ctrl+1Eclipse中Alt+Enter的IntelliJ弹出建议的操作和快速修复功能的菜单.

XCode中的等效快捷方式是什么?

eclipse xcode intellij-idea ios xcode5

7
推荐指数
0
解决办法
607
查看次数

是否有一种编程方式来了解如何创建Spring bean?

是否有一种编程方式来找出哪个Configuration类或xml文件创建了一个Spring bean?而不是挖掘代码来弄清楚.

java spring configuration-files

7
推荐指数
1
解决办法
790
查看次数

是否没有必要验证与Mockito中被模拟的方法相同的方法?

我经常看到相同的方法被验证为被模拟的方法Mockito(下面的例子).Mockito.verify()在这些情况下打电话有什么额外的好处吗?

//mock method
FooService fs = mock(FooService.class);
when(fs.getFoo()).thenReturn("foo");

//method under test
fs.doSomething();

//verify method
verify(fs).getFoo();
Run Code Online (Sandbox Code Playgroud)

如果fs.getFoo()未调用,该方法应该失败.为什么打电话verify?如果您需要ArgumentCaptor在验证中使用断言参数,我会看到好处; 除了ArgumentCaptor案,它是否只是不必要的?

java unit-testing mockito

7
推荐指数
1
解决办法
624
查看次数

为什么在Cassandra中拥有大型分区如此糟糕?

我到处都看到了这个警告,但是找不到关于这个主题的任何详细解释.

cassandra

7
推荐指数
1
解决办法
2353
查看次数

如何在IntelliJ中一次对多个错误应用“快速修复”?

在IntelliJ中,按Alt+Enter一个错误会弹出一个对话框,显示一个修正,该修正在您按时被应用Enter。有没有办法对文件中多个相同类型的错误执行此操作?

intellij-idea

6
推荐指数
1
解决办法
1257
查看次数

有关在Spring单例作用域服务中使用ThreadLocal的问题

在下面的单例作用域服务类中,类中的所有方法都需要一些在Service.doA()调用时已知的用户上下文.我没有在方法中传递信息,而是考虑将这些值存储在TheadLocal.关于这种方法,我有两个问题:

1)以下实施是否ThreadLocal正确使用?也就是说,它是线程安全的,正确的值将被读/写入ThreadLocal

2)是否ThreadLocal userInfo需要明确清理以防止任何内存泄漏?它会被垃圾收集吗?

@Service
public class Service {
    private static final ThreadLocal<UserInfo> userInfo = new ThreadLocal<>(); 

    public void doA() {
        // finds user info
        userInfo.set(new UserInfo(userId, name));
        doB();
        doC();
    }

    private void doB() {
        // needs user info
        UserInfo userInfo = userInfo.get();
    }

    private void doC() {
        // needs user info
        UserInfo userInfo = userInfo.get();
    }
}
Run Code Online (Sandbox Code Playgroud)

java concurrency spring multithreading thread-local

6
推荐指数
1
解决办法
4876
查看次数