小编Iho*_* M.的帖子

卡夫卡重新平衡.重复处理问题

我有一个消费者工作者应用程序,内部正在启动X线程数,每个线程产生它的KafkaCosnumer.Cosnumers拥有groupId相同的主题并订阅相同的主题.因此,每个消费者都可以获得公平的分区份额.

处理的本质是我不能丢失消息,也不能允许重复.我正在运行的kafka版本是0.10.2.1.

这是我面临的问题:消费者线程1开始消费消息,然后poll()获取一批消息.我也实现了ConsumerRebalanceListener,所以每次成功处理消息时,它都会被添加到offsets地图中.(请参阅下面的代码.)因此,一旦重新平衡发生,我可以在将分区重新分配给其他使用者之前提交我的偏移量.有时,为了处理该批处理,需要更长的时间max.poll.interval.ms,这是重新平衡发生的地方,分区从消费者1中提取并分配给消费者2.消费者1不知道分区被撤销并继续处理消息,在同时,消费者2从最后一个偏移量(由RebalanceListener提交)中获取并处理相同的消息.

有没有办法通知消费者他已撤销分区,以便他可以停止处理已经分配给其他消费者的循环中的消息?

public class RebalanceListener<K, V> implements ConsumerRebalanceListener {

    private final KafkaConsumer<K, V> consumer;

    private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS =
            Maps.newConcurrentMap();

    private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);

    public RebalanceListener(KafkaConsumer<K, V> consumer) {
        this.consumer = consumer;
    }

    public void addOffset(String topic, int partition, long offset) {
        LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}",
                topic, partition, offset);
        CURRENT_OFFSETS.put(new TopicPartition(topic, partition),
                new OffsetAndMetadata(offset, "commit"));
    }

    public …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

10
推荐指数
1
解决办法
3454
查看次数

Intellij + Ajc + Lombok/Mapstruct

我想在我的 IntelliJ Idea 中启用 AspectJ 编译器,因为我想在编译时编织几个方面。

同时,我在我的代码库中使用 Lombok 和 Mapstruct。

这两个需要额外的注释处理,这必须在ajc开始之前发生。我为 Lombok 和 Mapstruct 安装了两个插件。它们独立工作正常,正在生成源代码。但是当我启用ajc并勾选Enable annotation processing options,然后构建一个项目时,我得到:

Error:(9, 0) ajc: Internal error in the mapping processor: java.lang.NullPointerException   at org.aspectj.org.eclipse.jdt.internal.compiler.lookup.FieldBinding.sourceField(FieldBinding.java:425)     at org.aspectj.org.eclipse.jdt.internal.compiler.apt.model.TypeElementImpl$SourceLocationComparator.determineSourceStart(TypeElementImpl.java:108)      at org.aspectj.org.eclipse.jdt.internal.compiler.apt.model.TypeElementImpl$SourceLocationComparator.getSourceStart(TypeElementImpl.java:72)     at org.aspectj.org.eclipse.jdt.internal.compiler.apt.model.TypeElementImpl$SourceLocationComparator.compare(TypeElementImpl.java:65)    at org.aspectj.org.eclipse.jdt.internal.compiler.apt.model.TypeElementImpl$SourceLocationComparator.compare(TypeElementImpl.java:1)     at java.util.TimSort.countRunAndMakeAscending(TimSort.java:360)     at java.util.TimSort.sort(TimSort.java:234)     at java.util.Arrays.sort(Arrays.java:1512)      at java.util.ArrayList.sort(ArrayList.java:1462)    at java.util.Collections.sort(Collections.java:175)     at org.aspectj.org.eclipse.jdt.internal.compiler.apt.model.TypeElementImpl.getEnclosedElements(TypeElementImpl.java:166)    at org.mapstruct.ap.internal.util.workarounds.SpecificCompilerWorkarounds.replaceTypeElementIfNecessary(SpecificCompilerWorkarounds.java:99)    at org.mapstruct.ap.internal.util.Executables.getAllEnclosedExecutableElements(Executables.java:99)     at org.mapstruct.ap.internal.model.common.Type.getAllMethods(Type.java:633)     at org.mapstruct.ap.internal.model.common.Type.getPropertyReadAccessors(Type.java:496)      at org.mapstruct.ap.internal.model.BeanMappingMethod$Builder.build(BeanMappingMethod.java:168)      at org.mapstruct.ap.internal.processor.MapperCreationProcessor.getMappingMethods(MapperCreationProcessor.java:376)      at org.mapstruct.ap.internal.processor.MapperCreationProcessor.getMapper(MapperCreationProcessor.java:151)      at org.mapstruct.ap.internal.processor.MapperCreationProcessor.process(MapperCreationProcessor.java:122)    at org.mapstruct.ap.internal.processor.MapperCreationProcessor.process(MapperCreationProcessor.java:76)     at org.mapstruct.ap.MappingProcessor.process(MappingProcessor.java:283)     at org.mapstruct.ap.MappingProcessor.processMapperTypeElement(MappingProcessor.java:263)    at org.mapstruct.ap.MappingProcessor.processMapperElements(MappingProcessor.java:221) …
Run Code Online (Sandbox Code Playgroud)

aspectj intellij-idea mapstruct intellij-lombok-plugin

8
推荐指数
2
解决办法
2382
查看次数

在 Kafka 中设计消息键的最佳方法是什么?

我有一个分区主题,其中有X分区。

截至目前,在生成消息时,我ProducerRecord只创建了 Kafka 的指定topicvalue. 我没有定义key. 据我所知,我的消息将使用默认的内置分区器在分区之间均匀分布。另一方面,我有一个 Kafka 消费者线程池。每个 Kafka 消费者都将在其自己的专用线程中运行,以消费来自主题的消息。这些消费者中的每一个都被赋予相同的group.id. 这将允许并行使用消息。每个消费者都将被分配其公平份额的分区以供读取。

我希望我的消息以有序的方式被消费。我知道 Kafka 保证了分区内消息的顺序。所以,只要我想出一个合适的密钥结构,我就会对我的消息进行分区,使它们最终在同一个分区中。在某种程度上,消息键将消息分组并将它们存储在分区中。

是否有意义?

问:是否有可能由于设计不当的密钥而导致分区不均匀?一个人可能会收到比其他人更多的记录。它会以一种糟糕的方式影响我的 Kafka 集群的性能吗?消息密钥设计的最佳实践是什么?

multithreading multiprocessing apache-kafka kafka-consumer-api kafka-producer-api

7
推荐指数
1
解决办法
8372
查看次数

Kafka客户端连接池

执行kafka客户端的生产者/消费者连接池是否有意义?

kafka是否在内部维护一个已初始化并可以使用的连接对象列表?

我们希望最小化连接创建的时间,以便在发送/接收消息时不会产生额外的开销.

目前我们正在使用apache commons-pool库GenericObjectPool来保持连接.

任何帮助将不胜感激.

java apache-kafka

6
推荐指数
1
解决办法
912
查看次数

Spring Boot2。异步API。CompletableFuture与反应式

我的应用程序严重依赖异步Web服务。它是用Spring Boot 1.5.x构建的,它使我可以利用标准Java 8 CompletableFuture<T>来产生延迟的异步响应。有关更多信息,请参见 https://nickebbitt.github.io/blog/2017/03/22/async-web-service-using-completable-future

Spring Boot 2.0.x现在带有可以利用反应范例的入门包。Spring WebFlux是框架,用于实现响应式HTTP。

由于我已经按照第一段所述实现了API,因此通过重做服务以使用非阻塞反应式方法,我能获得很多收益吗?简而言之,我还将拥有非阻塞API,对吧?

有一个例子如何在基于异步API转换CompletableFuture<T>Mono<T>\Flux<T>

我当时正在考虑完全摆脱基于servlet的服务器(在我的情况下为Jetty),并选择Netty + Reactor。

不用说,我对整个反应式范式都是陌生的。

我想听听您的意见。

spring spring-boot project-reactor completable-future spring-webflux

6
推荐指数
1
解决办法
1894
查看次数

如何从 ConstraintViolationException 获取查询参数名称

我有一个服务方法:

 @GetMapping(path = "/api/some/path", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<?> getWhatever(@RequestParam(value = "page-number", defaultValue = "0") @Min(0) Integer pageNumber, ...
Run Code Online (Sandbox Code Playgroud)

如果 API 的调用者没有为page-number查询参数提交正确的值,javax.ConstraintViolationexception则会引发该错误。异常消息将如下所示:

getWhatever.pageNumber must be equal or greater than 0

在响应正文中,我希望显示以下消息:

page-number must be equal or greater than 0

我希望我的消息具有查询参数的名称,而不是参数的名称。恕我直言,包括参数的名称都暴露了实现细节。

问题是,我找不到携带查询参数名称的对象。好像ConstraintViolationException没有这个功能

我正在 spring-boot 中运行我的应用程序。

任何帮助,将不胜感激。

PS:我去过其他声称可以解决问题的类似线程,但实际上没有一个真正做到了。

java hibernate-validator spring-boot javax.validation

6
推荐指数
1
解决办法
6211
查看次数

新页面上的 Jasper Reports 子报表

我有一个由 6 个子报告组成的 jasper 报告。每个子报表都有自己的标题部分。现在我需要每个报告都在新页面上开始。我尝试添加分页符,但对我不起作用。

在研究这个问题时,我遇到了一个技巧,isTitleNewPage可以truehttp://www.dzone.com/snippets/jasperreports-subreport-new设置。因此,我将属性添加isTitleNewPage="true"到所有子报表,但现在子报表的标题仍保留在上一页,并且该子报表的详细信息和摘要显示在新页面上。

我怎么解决这个问题??

jasper-reports

5
推荐指数
2
解决办法
2万
查看次数

Java中的Espresso布尔函数优化

我需要优化在体内有20个以上变量的布尔函数,并且必须快速完成.我尝试了Quine-McCluskey算法,当变量数大于10时,它还不够好.我读到Espresso方法可以处理更多数量的变量而没有明显的开销.是否有一个实现此算法的java lib?我会感激任何反馈.

java algorithm optimization boolean-logic boolean-expression

5
推荐指数
0
解决办法
1187
查看次数

无法在 dropwizard 中配置日志记录。无法解析类型 ID“控制台”

我正在尝试在我的项目中配置控制台附加程序。当我在本地调试中启动我的应用程序时,它工作正常。然而,当我在 Gradle 中构建我的 jar 并将其作为独立应用程序启动时,我收到以下错误:

[0]; Could not resolve type id 'console' into a subtype of [simple type, class io.dropwizard.logging.AppenderFactory]: known type ids = [AppenderFactory]
Run Code Online (Sandbox Code Playgroud)

这是我的文件的片段.yml

logging:
    appenders:
    - type: console
      timeZone: UTC
      logFormat: '%-5level [%date{ISO8601}] [%X{requestId}] %c: %msg%n%rootException'
Run Code Online (Sandbox Code Playgroud)

我能够找到有关类似问题的帖子。例如,这里的线程建议检查文件是否META-INF/services/io.dropwizard.logging.AppenderFactory在应用程序 jar 中以及其内容是否如下:

io.dropwizard.logging.ConsoleAppenderFactory
io.dropwizard.logging.FileAppenderFactory
io.dropwizard.logging.SyslogAppenderFactory
Run Code Online (Sandbox Code Playgroud)

我已经验证过了。就我而言,文件位于它应该在的位置,并且内容完全相同。任何帮助将不胜感激。

Dropwizard 版本 0.9.2

gradle dropwizard

5
推荐指数
1
解决办法
1871
查看次数

使用 Mapstruct 映射类的层次结构

我有一个类层次结构:VehicleDTO是一个基本抽象类。 CarDTO, TruckDTO, VanDTO从它延伸出来。

我在映射器的另一侧有相同的层次结构: VehicleBO <- CarBO, TruckBO, VanBO

我希望将所有映射逻辑合并到一个映射器中。时期。

我已经为公共属性定义了映射,但是当它变得有趣时,我在编译过程中得到了这个异常:

The return type ... is an abstract class or interface.
Provide a non abstract / non interface result type or a factory method. 
Run Code Online (Sandbox Code Playgroud)

那么,我如何指定一个工厂方法,它基于特定属性的值或 pojo 的类,为我创建一个目标对象?我希望有一个能真正解决问题的好的代码片段。

谢谢!

mapstruct

5
推荐指数
1
解决办法
7767
查看次数