我有一个消费者工作者应用程序,内部正在启动X
线程数,每个线程产生它的KafkaCosnumer.Cosnumers拥有groupId
相同的主题并订阅相同的主题.因此,每个消费者都可以获得公平的分区份额.
处理的本质是我不能丢失消息,也不能允许重复.我正在运行的kafka版本是0.10.2.1.
这是我面临的问题:消费者线程1开始消费消息,然后poll()
获取一批消息.我也实现了ConsumerRebalanceListener
,所以每次成功处理消息时,它都会被添加到offsets
地图中.(请参阅下面的代码.)因此,一旦重新平衡发生,我可以在将分区重新分配给其他使用者之前提交我的偏移量.有时,为了处理该批处理,需要更长的时间max.poll.interval.ms
,这是重新平衡发生的地方,分区从消费者1中提取并分配给消费者2.消费者1不知道分区被撤销并继续处理消息,在同时,消费者2从最后一个偏移量(由RebalanceListener提交)中获取并处理相同的消息.
有没有办法通知消费者他已撤销分区,以便他可以停止处理已经分配给其他消费者的循环中的消息?
public class RebalanceListener<K, V> implements ConsumerRebalanceListener {
private final KafkaConsumer<K, V> consumer;
private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS =
Maps.newConcurrentMap();
private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);
public RebalanceListener(KafkaConsumer<K, V> consumer) {
this.consumer = consumer;
}
public void addOffset(String topic, int partition, long offset) {
LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}",
topic, partition, offset);
CURRENT_OFFSETS.put(new TopicPartition(topic, partition),
new OffsetAndMetadata(offset, "commit"));
}
public …
Run Code Online (Sandbox Code Playgroud) 我想在我的 IntelliJ Idea 中启用 AspectJ 编译器,因为我想在编译时编织几个方面。
同时,我在我的代码库中使用 Lombok 和 Mapstruct。
这两个需要额外的注释处理,这必须在ajc
开始之前发生。我为 Lombok 和 Mapstruct 安装了两个插件。它们独立工作正常,正在生成源代码。但是当我启用ajc
并勾选Enable annotation processing options
,然后构建一个项目时,我得到:
Error:(9, 0) ajc: Internal error in the mapping processor: java.lang.NullPointerException at org.aspectj.org.eclipse.jdt.internal.compiler.lookup.FieldBinding.sourceField(FieldBinding.java:425) at org.aspectj.org.eclipse.jdt.internal.compiler.apt.model.TypeElementImpl$SourceLocationComparator.determineSourceStart(TypeElementImpl.java:108) at org.aspectj.org.eclipse.jdt.internal.compiler.apt.model.TypeElementImpl$SourceLocationComparator.getSourceStart(TypeElementImpl.java:72) at org.aspectj.org.eclipse.jdt.internal.compiler.apt.model.TypeElementImpl$SourceLocationComparator.compare(TypeElementImpl.java:65) at org.aspectj.org.eclipse.jdt.internal.compiler.apt.model.TypeElementImpl$SourceLocationComparator.compare(TypeElementImpl.java:1) at java.util.TimSort.countRunAndMakeAscending(TimSort.java:360) at java.util.TimSort.sort(TimSort.java:234) at java.util.Arrays.sort(Arrays.java:1512) at java.util.ArrayList.sort(ArrayList.java:1462) at java.util.Collections.sort(Collections.java:175) at org.aspectj.org.eclipse.jdt.internal.compiler.apt.model.TypeElementImpl.getEnclosedElements(TypeElementImpl.java:166) at org.mapstruct.ap.internal.util.workarounds.SpecificCompilerWorkarounds.replaceTypeElementIfNecessary(SpecificCompilerWorkarounds.java:99) at org.mapstruct.ap.internal.util.Executables.getAllEnclosedExecutableElements(Executables.java:99) at org.mapstruct.ap.internal.model.common.Type.getAllMethods(Type.java:633) at org.mapstruct.ap.internal.model.common.Type.getPropertyReadAccessors(Type.java:496) at org.mapstruct.ap.internal.model.BeanMappingMethod$Builder.build(BeanMappingMethod.java:168) at org.mapstruct.ap.internal.processor.MapperCreationProcessor.getMappingMethods(MapperCreationProcessor.java:376) at org.mapstruct.ap.internal.processor.MapperCreationProcessor.getMapper(MapperCreationProcessor.java:151) at org.mapstruct.ap.internal.processor.MapperCreationProcessor.process(MapperCreationProcessor.java:122) at org.mapstruct.ap.internal.processor.MapperCreationProcessor.process(MapperCreationProcessor.java:76) at org.mapstruct.ap.MappingProcessor.process(MappingProcessor.java:283) at org.mapstruct.ap.MappingProcessor.processMapperTypeElement(MappingProcessor.java:263) at org.mapstruct.ap.MappingProcessor.processMapperElements(MappingProcessor.java:221) …
Run Code Online (Sandbox Code Playgroud) 我有一个分区主题,其中有X
分区。
截至目前,在生成消息时,我ProducerRecord
只创建了 Kafka 的指定topic
和value
. 我没有定义key
. 据我所知,我的消息将使用默认的内置分区器在分区之间均匀分布。另一方面,我有一个 Kafka 消费者线程池。每个 Kafka 消费者都将在其自己的专用线程中运行,以消费来自主题的消息。这些消费者中的每一个都被赋予相同的group.id
. 这将允许并行使用消息。每个消费者都将被分配其公平份额的分区以供读取。
我希望我的消息以有序的方式被消费。我知道 Kafka 保证了分区内消息的顺序。所以,只要我想出一个合适的密钥结构,我就会对我的消息进行分区,使它们最终在同一个分区中。在某种程度上,消息键将消息分组并将它们存储在分区中。
是否有意义?
问:是否有可能由于设计不当的密钥而导致分区不均匀?一个人可能会收到比其他人更多的记录。它会以一种糟糕的方式影响我的 Kafka 集群的性能吗?消息密钥设计的最佳实践是什么?
multithreading multiprocessing apache-kafka kafka-consumer-api kafka-producer-api
执行kafka客户端的生产者/消费者连接池是否有意义?
kafka是否在内部维护一个已初始化并可以使用的连接对象列表?
我们希望最小化连接创建的时间,以便在发送/接收消息时不会产生额外的开销.
目前我们正在使用apache commons-pool库GenericObjectPool
来保持连接.
任何帮助将不胜感激.
我的应用程序严重依赖异步Web服务。它是用Spring Boot 1.5.x构建的,它使我可以利用标准Java 8 CompletableFuture<T>
来产生延迟的异步响应。有关更多信息,请参见
https://nickebbitt.github.io/blog/2017/03/22/async-web-service-using-completable-future
Spring Boot 2.0.x现在带有可以利用反应范例的入门包。Spring WebFlux是框架,用于实现响应式HTTP。
由于我已经按照第一段所述实现了API,因此通过重做服务以使用非阻塞反应式方法,我能获得很多收益吗?简而言之,我还将拥有非阻塞API,对吧?
有一个例子如何在基于异步API转换CompletableFuture<T>
到Mono<T>\Flux<T>
?
我当时正在考虑完全摆脱基于servlet的服务器(在我的情况下为Jetty),并选择Netty + Reactor。
不用说,我对整个反应式范式都是陌生的。
我想听听您的意见。
spring spring-boot project-reactor completable-future spring-webflux
我有一个服务方法:
@GetMapping(path = "/api/some/path", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<?> getWhatever(@RequestParam(value = "page-number", defaultValue = "0") @Min(0) Integer pageNumber, ...
Run Code Online (Sandbox Code Playgroud)
如果 API 的调用者没有为page-number
查询参数提交正确的值,javax.ConstraintViolationexception
则会引发该错误。异常消息将如下所示:
getWhatever.pageNumber must be equal or greater than 0
在响应正文中,我希望显示以下消息:
page-number must be equal or greater than 0
我希望我的消息具有查询参数的名称,而不是参数的名称。恕我直言,包括参数的名称都暴露了实现细节。
问题是,我找不到携带查询参数名称的对象。好像ConstraintViolationException
没有这个功能
我正在 spring-boot 中运行我的应用程序。
任何帮助,将不胜感激。
PS:我去过其他声称可以解决问题的类似线程,但实际上没有一个真正做到了。
我有一个由 6 个子报告组成的 jasper 报告。每个子报表都有自己的标题部分。现在我需要每个报告都在新页面上开始。我尝试添加分页符,但对我不起作用。
在研究这个问题时,我遇到了一个技巧,isTitleNewPage
可以true
在
http://www.dzone.com/snippets/jasperreports-subreport-new设置。因此,我将属性添加isTitleNewPage="true"
到所有子报表,但现在子报表的标题仍保留在上一页,并且该子报表的详细信息和摘要显示在新页面上。
我怎么解决这个问题??
我需要优化在体内有20个以上变量的布尔函数,并且必须快速完成.我尝试了Quine-McCluskey算法,当变量数大于10时,它还不够好.我读到Espresso方法可以处理更多数量的变量而没有明显的开销.是否有一个实现此算法的java lib?我会感激任何反馈.
java algorithm optimization boolean-logic boolean-expression
我正在尝试在我的项目中配置控制台附加程序。当我在本地调试中启动我的应用程序时,它工作正常。然而,当我在 Gradle 中构建我的 jar 并将其作为独立应用程序启动时,我收到以下错误:
[0]; Could not resolve type id 'console' into a subtype of [simple type, class io.dropwizard.logging.AppenderFactory]: known type ids = [AppenderFactory]
Run Code Online (Sandbox Code Playgroud)
这是我的文件的片段.yml
:
logging:
appenders:
- type: console
timeZone: UTC
logFormat: '%-5level [%date{ISO8601}] [%X{requestId}] %c: %msg%n%rootException'
Run Code Online (Sandbox Code Playgroud)
我能够找到有关类似问题的帖子。例如,这里的线程建议检查文件是否META-INF/services/io.dropwizard.logging.AppenderFactory
在应用程序 jar 中以及其内容是否如下:
io.dropwizard.logging.ConsoleAppenderFactory
io.dropwizard.logging.FileAppenderFactory
io.dropwizard.logging.SyslogAppenderFactory
Run Code Online (Sandbox Code Playgroud)
我已经验证过了。就我而言,文件位于它应该在的位置,并且内容完全相同。任何帮助将不胜感激。
Dropwizard 版本 0.9.2
我有一个类层次结构:VehicleDTO
是一个基本抽象类。
CarDTO, TruckDTO, VanDTO
从它延伸出来。
我在映射器的另一侧有相同的层次结构:
VehicleBO <- CarBO, TruckBO, VanBO
。
我希望将所有映射逻辑合并到一个映射器中。时期。
我已经为公共属性定义了映射,但是当它变得有趣时,我在编译过程中得到了这个异常:
The return type ... is an abstract class or interface.
Provide a non abstract / non interface result type or a factory method.
Run Code Online (Sandbox Code Playgroud)
那么,我如何指定一个工厂方法,它基于特定属性的值或 pojo 的类,为我创建一个目标对象?我希望有一个能真正解决问题的好的代码片段。
谢谢!
java ×4
apache-kafka ×3
mapstruct ×2
spring-boot ×2
algorithm ×1
aspectj ×1
dropwizard ×1
gradle ×1
optimization ×1
spring ×1