我需要找到一种方法来向卡夫卡询问一系列主题.我知道我可以使用目录中kafka-topics.sh包含的脚本来做到这一点bin\.有了这个列表,我需要每个主题的所有消费者.我找不到该目录中的脚本,也没有在kafka-consumer-api库中找到允许我这样做的类.
这背后的原因是我需要弄清楚主题偏移量与消费者偏移量之间的差异.
有没有办法实现这个目标?或者我是否需要在每个消费者中实现此功能?
我有点新鲜Observers,我仍然想弄明白.我有以下代码:
observableKafka.getRealTimeEvents()
.filter(this::isTrackedAccount)
.filter(e -> LedgerMapper.isDepositOrClosedTrade((Transaction) e.getPayload()))
.map(ledgerMapper::mapLedgerTransaction)
.map(offerCache::addTransaction)
.filter(offer -> offer != null) // Offer may have been removed from cache since last check
.filter(Offer::isReady)
.doOnError(throwable -> {
LOG.info("Exception thrown on realtime events");
})
.forEach(awardChecker::awardFailOrIgnore);
Run Code Online (Sandbox Code Playgroud)
getRealTimeEvents()返回一个Observable<Event>.
.doOnError问题的位置是什么?另外,在这段代码中添加多个调用会产生什么影响?我已经意识到我可以做到并且所有这些都被调用,但我不确定它的目的是什么.
我正在使用Vagrant运行Centos虚拟机.机器似乎运行正常,但当我尝试同步Perforce时,我可以看到以下错误:
[vagrant@vagrant-c5-x86_64 ~]$ /perforce/p4 sync -f ...
Perforce client error:
Connect to server failed; check $P4PORT.
failed.TCP connect to perforce.xxx.com:1666
Servname not supported for ai_socktype
Run Code Online (Sandbox Code Playgroud)
我已经阅读了这篇http://www.ducea.com/2006/09/11/error-servname-not-supported-for-ai_socktype/并尝试设置端口/etc/services,但它不起作用.我甚至不确定问题是Perforce还是OS相关.
任何提示?
我有一个JSON模式和一个匹配模式的json字符串,除了它可能有一些额外的字段.如果我不添加那些字段,杰克逊将抛出异常objectMapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);.有没有办法获取这些额外字段的集合来记录它们,即使我抛出异常?
这是代码的相关位:
public boolean validate(Message<String> json) {
List<String> errorList = jsonSchema.validate(json.getPayload());
ObjectMapper mapper = new ObjectMapper();
try {
Update update = mapper.readValue(json.getPayload(), Update.class);
} catch (IOException e) {
System.out.println("Broken");
}
if(!errorList.isEmpty()) {
LOG.warn("Json message did not match schema: {}", errorList);
}
return true;
}
Run Code Online (Sandbox Code Playgroud) 我们正在将我们的kafka实现升级到.9并使用新的消费者java api来创建消费者.我正在使用以下代码用于消费者,我们正在使用设置主题到消费者,如在线A和线B是调用我们的服务处理我们收到的消息.现在问题是如果我们的消息处理花费超过30秒,我们就会得到Exception.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "1000");
props.put("receive.buffer.bytes", 10485760);
props.put("fetch.message.max.bytes", 5242880);
props.put("enable.auto.commit", false);
//with partition assigned to consumer
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
// TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
//consumer.assign(Arrays.asList(partition0));
//assign topic to consumer without partition
//LINE A
consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
try {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
consumeFromQueue(records);//LINE B
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
System.out.println("CommitFailedException");
} catch (Exception …Run Code Online (Sandbox Code Playgroud) 我对groovy很新,而且我发现通过将一个字段公开,groovy默认提供getter和setter.有没有办法让getter而不是默认的setter?这背后的原因是我有一个Builder,我不想提供对对象字段的访问以进行修改.
我有以下功能:
(defn add-recommendations-to-cache [{:keys [trackingId rec-service recs]} cache]
(assoc-in cache [trackingId rec-service] recs))
Run Code Online (Sandbox Code Playgroud)
我将原子定义为:
(def cache (atom {}))
Run Code Online (Sandbox Code Playgroud)
如果我可以改变传递给函数的参数的顺序,我会使用:
(swap! cache add-recommendations-to-cache msg)
Run Code Online (Sandbox Code Playgroud)
既然我不能,我怎么能swap使用原子,函数和包含第一个参数所需要的消息?我尝试了几种可能的组合(见下文),但似乎都没有.
我试过了:
(swap! cache add-recommendations-to-cache msg cache)
Run Code Online (Sandbox Code Playgroud)
和
(swap! cache (add-recommendations-to-cache msg))
Run Code Online (Sandbox Code Playgroud)
和其他几个没用.