如果我有一个enable.auto.commit=false和我打电话consumer.poll()而没有打电话consumer.commitAsync(),为什么consumer.poll()下次打电话时会返回新记录?
由于我没有提交我的偏移量,我希望poll()能够返回最新的偏移量,这应该是相同的记录.
我问,因为我在处理过程中试图处理故障情况.我希望不提交偏移量,poll()将再次返回相同的记录,以便我可以再次重新处理这些失败的记录.
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record : records) {
try {
//process record
consumer.commitAsync();
} catch (Exception e) {
}
/**
If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception.
**/
}
}
}
}
Run Code Online (Sandbox Code Playgroud) 我KafkaProducer可以使用KafkaAvroSerializer序列化对象到我的主题.但是,KafkaConsumer.poll()返回反GenericRecord序列化而不是我的序列化类.
MyKafkaProducer
KafkaProducer<CharSequence, MyBean> producer;
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
MyBean bean = new MyBean();
producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>(topic, bean.getId(), bean));
Run Code Online (Sandbox Code Playgroud)
我的KafkaConsumer
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
properties.load(props);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
if (records.isEmpty()) {
continue;
}
for …Run Code Online (Sandbox Code Playgroud) 我的Spark作业因YARN错误而失败Container killed by YARN for exceeding memory limits 10.0 GB of 10 GB physical memory used。
凭直觉,我将核心数从减少5到1,并且作业成功完成。
我没有增加,executor-memory因为这10g是我的YARN群集的最大值。
我只是想确认我的直觉。减少executor-cores消费数量会减少executor-memory吗?如果是这样,为什么?
使用DelimitMate,它会自动生成结束括号.当我在括号内完成输入时,我按哪些键击以快速转到右括号的右侧?(现在我必须手动按ESC然后'a')
是否有一种编程方式来找出哪个Configuration类或xml文件创建了一个Spring bean?而不是挖掘代码来弄清楚.
我经常看到相同的方法被验证为被模拟的方法Mockito(下面的例子).Mockito.verify()在这些情况下打电话有什么额外的好处吗?
//mock method
FooService fs = mock(FooService.class);
when(fs.getFoo()).thenReturn("foo");
//method under test
fs.doSomething();
//verify method
verify(fs).getFoo();
Run Code Online (Sandbox Code Playgroud)
如果fs.getFoo()未调用,该方法应该失败.为什么打电话verify?如果您需要ArgumentCaptor在验证中使用断言参数,我会看到好处; 除了ArgumentCaptor案,它是否只是不必要的?
在IntelliJ中,按Alt+Enter一个错误会弹出一个对话框,显示一个修正,该修正在您按时被应用Enter。有没有办法对文件中多个相同类型的错误执行此操作?
在下面的单例作用域服务类中,类中的所有方法都需要一些在Service.doA()调用时已知的用户上下文.我没有在方法中传递信息,而是考虑将这些值存储在TheadLocal.关于这种方法,我有两个问题:
1)以下实施是否ThreadLocal正确使用?也就是说,它是线程安全的,正确的值将被读/写入ThreadLocal?
2)是否ThreadLocal userInfo需要明确清理以防止任何内存泄漏?它会被垃圾收集吗?
@Service
public class Service {
private static final ThreadLocal<UserInfo> userInfo = new ThreadLocal<>();
public void doA() {
// finds user info
userInfo.set(new UserInfo(userId, name));
doB();
doC();
}
private void doB() {
// needs user info
UserInfo userInfo = userInfo.get();
}
private void doC() {
// needs user info
UserInfo userInfo = userInfo.get();
}
}
Run Code Online (Sandbox Code Playgroud) java ×4
apache-kafka ×2
spring ×2
apache-spark ×1
avro ×1
cassandra ×1
concurrency ×1
confluent ×1
eclipse ×1
hadoop-yarn ×1
ios ×1
mockito ×1
thread-local ×1
unit-testing ×1
vim ×1
xcode ×1
xcode5 ×1