当我发出消息时,我想从经纪人那里得到一些回应.我已经尝试过使用的CallBack机制(通过实现CallBack),KafkaProducer.send
但它没有工作,也没有调用onCompletion
方法.
当我关闭Kafka服务器并尝试生成消息时,它会调用回调方法.
有没有其他方式得到确认?
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("Called Callback method");
if (metadata != null) {
System.out.println("message(" + key + ", " + message
+ ") sent to partition(" + metadata.partition() + "), "
+ "offset(" + metadata.offset() + ") in " + elapsedTime
+ " ms");
} else {
exception.printStackTrace();
}
}
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props); …
Run Code Online (Sandbox Code Playgroud) 我将文档保存在MongoDb中并使用带有类,集合名称参数的save方法.
save和insert的所有方法都是void返回类型.那我怎么知道我的文件是否已保存.
是我必须重新查询以检查我的文档是否已保存.我只需要任何返回值来查看它是否已保存.
此外,我使用Spring Data for Mongo进行操作.
我正在研究现有的Kafka但是有些它如何停止响应配置'retention.ms'.所以我安装了Kafka 0.8.2.1的新副本并运行它.但它会不断显示以下信息消息:它显示的IP是我们网络上的有效IP,它会在其上生成消息.但即使它的机器关闭它仍然显示它
[2016-02-18 10:46:10,830] INFO Accepted socket connection from /10.20.xxx.xxx:5988
1 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-02-18 10:46:10,832] INFO Refusing session request for client /10.20.xxx.xxx:
59881 as it has seen zxid 0x8b47 our last zxid is 0x165 client must try another
server (org.apache.zookeeper.server.ZooKeeperServer)
[2016-02-18 10:46:10,832] INFO Closed socket connection for client /10.20.xxx.xxx:
59881 (no session established for client) (org.apache.zookeeper.server.NIOServer
Cnxn)
Run Code Online (Sandbox Code Playgroud) 我能够使用apache kafka提交偏移类并能够使用ConsumerConnector进行提交.我查看了apache camel-kafka组件,其消费者选项为" autoCommitEnable ",与" auto.commit.enable "属性相同.现在是Camel Java DSL中的任何属性或方法,在消费消息之后我们可以手动提交偏移(通过URL中提供的方法或消费者选项) 或者 我们必须再次使用Kafka Consumer API来提交消费者偏移吗?
我正在使用apache camel来集成我的kafka消息.我也使用JAVA DSL来消费来自kafka端点的消息.
使用apache kafka API已知如何使用给定的属性切换提交使用者偏移量.
如果我在camel-kafka组件中取消自动提交,那我怎么能在aplhe camel中提交偏移呢?
我使用下面的端点来禁用Apache Camel中的自动提交
kafka.consumer.uri = kafka://{{kafka.host}}?zookeeperHost={{zookeeper.host}}&zookeeperPort={{zookeeper.port}}&groupId={{kafka.consumerGroupId}}&topic={{kafka.topic.transformer_t}}&{{kafka.uri.options}}
kafka.uri.options = autoCommitEnable=false
Run Code Online (Sandbox Code Playgroud) 我已经使用其配置中指出的 Java 11 安装了最新版本的 SonarQube wrapper.config
。我读到它甚至可以分析用 Java 8 编写的代码,但我收到以下异常:
[错误] 无法在项目 ptf-parent 上执行目标 org.sonarsource.scanner.maven:sonar-maven-plugin:3.8.0.2131:sonar (default-cli):目标 org.sonarsource.scanner.maven 的执行 default-cli :sonar-maven-plugin:3.8.0.2131:sonar失败:执行org.sonarsource.scanner.maven时遇到API不兼容:sonar-maven-plugin:3.8.0.2131:sonar:java.lang.UnsupportedClassVersionError:org/sonar /batch/bootstrapper/EnvironmentInformation 已由更新版本的 Java 运行时(类文件版本 55.0)编译,此版本的 Java 运行时仅识别最高版本为 52.0 的类文件
目前,我的代码是使用 Java 8 构建的,我想使用最新的 SonarQube 版本进行分析。
我正在从 Eclipse IDE 运行分析器。我的所有项目都配置为使用Java 8和Maven 3.8.2来使用声纳参数在SonarQube上推送报告,但它不起作用。
以下是我在 Eclipse Run as Maven 构建中提供的参数:
-e clean install sonar:sonar -Dsonar.java.jdkHome=C:\Program Files\Java\jdk1.8.0_251 -Dsonar.project
-Dsonar.projectKey=TESTPROJECT -Dsonar.host.url=http://localhost:9000 -Dsonar.login=a2603f6cd079e4a7833179333333333333333333e
Run Code Online (Sandbox Code Playgroud) 我创建了多线程消费者应用程序来处理各种分区.查看各种博客,我开始了解'max.poll.records'属性,以便控制来自给定主题,分区的记录集.(因此它可以很快从记录循环中出来,因此调用cons.poll ()保持活力)
问题是我的处理逻辑需要时间来处理每条记录.在启动Cons-2时,两者都开始在相同的分区上工作,因为Cons-1仍未进行重新平衡(即尚未发生cons.poll()).
增加消费者以便他们可以重新平衡他们自己,cons.poll()将不会发生,除非处理所有记录.
我可能不会去'session.timeout.ms',因为开始新的消费者也可能开始使用与Cons-1相同的分区.
我尝试使用以下方法设置属性:
props.put("max.poll.records",1);
props.put("max.poll.records","1");
Run Code Online (Sandbox Code Playgroud)
但都没有改变.来自民意调查的记录.
我正在使用Apache Kafka 9和Below API.
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.9.0.1_1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud) 我有一个多线程应用程序,它使用生产者类来生成消息,早些时候我使用下面的代码为每个请求创建生产者。其中 KafkaProducer 是用每个请求新建的,如下所示:
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);
ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
isValidMsg[0] = false;
exception.printStackTrace();
saveOrUpdateLog(msgBean, producerType, exception);
logger.error("ERROR:Unable to produce message.",exception);
}
}
});
producer.close();
Run Code Online (Sandbox Code Playgroud)
然后我阅读了关于生产者的 Kafka 文档,并知道我们应该使用单个生产者实例来获得良好的性能。
然后我在单例类中创建了 KafkaProducer 的单个实例。
现在我们应该在何时何地关闭生产者。显然,如果我们在第一次发送请求后关闭生产者,它不会找到生产者重新发送消息,因此抛出:
java.lang.IllegalStateException: Cannot send after the producer is closed.
Run Code Online (Sandbox Code Playgroud)
或者我们如何在关闭后重新连接到生产者。问题是如果程序崩溃或有异常呢?
我有各种自制项目,具有第三方库依赖项.我将它们捆绑为OSGI容器但无法解决我的项目中的深度依赖项.现在我正在寻找karaf文件夹,我可以放置我的库,所以捆绑可以直接访问它们而不是安装它们.
更多我正在使用maven.
编辑:在遵循你的'功能'解决方案后,我能够产生明显的传递依赖性,但问题是现在它也寻找非常常见的java文件:例如:下面是相当大的依赖列表:
bsh -- Cannot be resolved
com.fasterxml.jackson.annotation -- Cannot be resolved
com.fasterxml.jackson.core -- Cannot be resolved
com.fasterxml.jackson.databind -- Cannot be resolved
com.fasterxml.jackson.databind.annotation -- Cannot be resolved
com.fasterxml.jackson.databind.module -- Cannot be resolved
com.gargoylesoftware.htmlunit -- Cannot be resolved
com.gargoylesoftware.htmlunit.util -- Cannot be resolved
com.google.protobuf -- Cannot be resolved
com.ibm.uvm.tools -- Cannot be resolved
com.ibm.websphere.uow -- Cannot be resolved
com.ibm.wsspi.uow -- Cannot be resolved
com.jamonapi -- Cannot be resolved
com.jamonapi.utils -- Cannot be resolved
com.jayway.jsonpath -- Cannot be resolved
com.jcraft.jzlib -- …
Run Code Online (Sandbox Code Playgroud) 如何获取kafka上的连接消费者列表?由于消费者在代理上连接,是否有像ZkClient/ZkUtils这样的java实用程序来获取Kafka 0.9.0.x中连接消费者的列表?就像我们使用以下实用程序获取经纪人列表一样:
ZkClient zkClient = new ZkClient(endpoint.getZookeeperConnect(), 60000);
if(zkClient!=null){
List<String> brokerIds = zkClient.getChildren(ZkUtils.BrokerIdsPath());
if(CollectionUtils.isNotEmpty(brokerIds) && brokerIds.contains(brokerId)){
logger.debug("Broker:{{}} is connected to Zookeeper.",brokerId);
flag = true;
}
else{
logger.error("ERROR:Broker:{{}} is not connected to Zookeeper.",brokerId);
}
zkClient.close();
}
Run Code Online (Sandbox Code Playgroud)
我正在使用Kafka 0.9.0.x和maven下面的java lib:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
更新:
我打开了一个' kafka-console-consumer.bat '并运行了一次然后越过了cmd提示符.然后继续" zookeeper-shell.bat "和ls/consumer然后显示[console-consumer-6008],但我的编程消费者没有显示.使用zkClient.getChildren(ZkUtils.ConsumersPath())
我现在可以只查看提到的消费者.
我想根据我在properties
文件中提供的设置检索文件.例如,我只想在第一次迭代中获得50个文件并停止获取所有文件夹中可能存在数千个文件.
我怎么能随机获得50个文件并且没有获得所有列表或迭代文件以获得50?
filesList = folder.listFiles( new FileFilter() {
@Override
public boolean accept(File name) {
return (name.isFile() && ( name.getName().contains("key1")));
}
});
Run Code Online (Sandbox Code Playgroud)
编辑:我删除了for
声明.即使我只提供了一个文件夹来获取它将获取所有文件,计数器变量仍然循环文件夹中的所有文件不是一个好的解决方案.
java ×11
apache-kafka ×6
apache-camel ×2
apache-karaf ×1
file ×1
filefilter ×1
java-7 ×1
karaf ×1
maven ×1
mongodb ×1
packaging ×1
scala ×1
sonarqube ×1