小编usm*_*man的帖子

如果消息是由制作人制作的,如何从Kafka经纪人那里得到确认?

当我发出消息时,我想从经纪人那里得到一些回应.我已经尝试过使用的CallBack机制(通过实现CallBack),KafkaProducer.send但它没有工作,也没有调用onCompletion方法.

当我关闭Kafka服务器并尝试生成消息时,它会调用回调方法.

有没有其他方式得到确认?

@Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        System.out.println("Called Callback method");
        if (metadata != null) {
            System.out.println("message(" + key + ", " + message
                    + ") sent to partition(" + metadata.partition() + "), "
                    + "offset(" + metadata.offset() + ") in " + elapsedTime
                    + " ms");
        } else {
            exception.printStackTrace();
        }

    }

props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);

KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api kafka-producer-api

7
推荐指数
2
解决办法
2万
查看次数

我们怎么知道文件是否在save()方法后保存在Mongodb中?

我将文档保存在MongoDb中并使用带有类,集合名称参数的save方法.

save和insert的所有方法都是void返回类型.那我怎么知道我的文件是否已保存.

是我必须重新查询以检查我的文档是否已保存.我只需要任何返回值来查看它是否已保存.

此外,我使用Spring Data for Mongo进行操作.

java mongodb spring-data-mongodb

6
推荐指数
1
解决办法
2834
查看次数

Kafka zookeeper继续显示信息消息'接受来自/10.xxx.xxx.xxx的套接字连接'

我正在研究现有的Kafka但是有些它如何停止响应配置'retention.ms'.所以我安装了Kafka 0.8.2.1的新副本并运行它.但它会不断显示以下信息消息:它显示的IP是我们网络上的有效IP,它会在其上生成消息.但即使它的机器关闭它仍然显示它

[2016-02-18 10:46:10,830] INFO Accepted socket connection from /10.20.xxx.xxx:5988
1 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-02-18 10:46:10,832] INFO Refusing session request for client /10.20.xxx.xxx:
59881 as it has seen zxid 0x8b47 our last zxid is 0x165 client must try another
server (org.apache.zookeeper.server.ZooKeeperServer)
[2016-02-18 10:46:10,832] INFO Closed socket connection for client /10.20.xxx.xxx:
59881 (no session established for client) (org.apache.zookeeper.server.NIOServer
Cnxn)
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-zookeeper

6
推荐指数
1
解决办法
695
查看次数

使用apache camel的camel-kafka组件手动提交消费者偏移

我能够使用apache kafka提交偏移类并能够使用ConsumerConnector进行提交.我查看了apache camel-kafka组件,其消费者选项为" autoCommitEnable ",与" auto.commit.enable "属性相同.现在是Camel Java DSL中的任何属性或方法,在消费消息之后我们可以手动提交偏移(通过URL中提供的方法或消费者选项) 或者 我们必须再次使用Kafka Consumer API来提交消费者偏移吗?

java apache-camel apache-kafka

5
推荐指数
1
解决办法
659
查看次数

如何使用Camel-kafka提交消费者抵消?

我正在使用apache camel来集成我的kafka消息.我也使用JAVA DSL来消费来自kafka端点的消息.

使用apache kafka API已知如何使用给定的属性切换提交使用者偏移量.

如果我在camel-kafka组件中取消自动提交,那我怎么能在aplhe camel中提交偏移呢?

我使用下面的端点来禁用Apache Camel中的自动提交

kafka.consumer.uri = kafka://{{kafka.host}}?zookeeperHost={{zookeeper.host}}&zookeeperPort={{zookeeper.port}}&groupId={{kafka.consumerGroupId}}&topic={{kafka.topic.transformer_t}}&{{kafka.uri.options}}
kafka.uri.options = autoCommitEnable=false
Run Code Online (Sandbox Code Playgroud)

java apache-camel

5
推荐指数
1
解决办法
1063
查看次数

如何使用使用 Java 11 的最新 SonarQube 免费版本分析 Java 8 代码?

我已经使用其配置中指出的 Java 11 安装了最新版本的 SonarQube wrapper.config。我读到它甚至可以分析用 Java 8 编写的代码,但我收到以下异常:

[错误] 无法在项目 ptf-parent 上执行目标 org.sonarsource.scanner.maven:sonar-maven-plugin:3.8.0.2131:sonar (default-cli):目标 org.sonarsource.scanner.maven 的执行 default-cli :sonar-maven-plugin:3.8.0.2131:sonar失败:执行org.sonarsource.scanner.maven时遇到API不兼容:sonar-maven-plugin:3.8.0.2131:sonar:java.lang.UnsupportedClassVersionError:org/sonar /batch/bootstrapper/EnvironmentInformation 已由更新版本的 Java 运行时(类文件版本 55.0)编译,此版本的 Java 运行时仅识别最高版本为 52.0 的类文件

目前,我的代码是使用 Java 8 构建的,我想使用最新的 SonarQube 版本进行分析。

我正在从 Eclipse IDE 运行分析器。我的所有项目都配置为使用Java 8和Maven 3.8.2来使用声纳参数在SonarQube上推送报告,但它不起作用。

以下是我在 Eclipse Run as Maven 构建中提供的参数:

 -e clean install sonar:sonar -Dsonar.java.jdkHome=C:\Program Files\Java\jdk1.8.0_251 -Dsonar.project
-Dsonar.projectKey=TESTPROJECT -Dsonar.host.url=http://localhost:9000 -Dsonar.login=a2603f6cd079e4a7833179333333333333333333e
Run Code Online (Sandbox Code Playgroud)

java sonarqube

5
推荐指数
1
解决办法
1万
查看次数

无法在kafka consumer下设置'max.poll.records',其中cons.poll仍然返回分区下的所有记录

我创建了多线程消费者应用程序来处理各种分区.查看各种博客,我开始了解'max.poll.records'属性,以便控制来自给定主题,分区的记录集.(因此它可以很快从记录循环中出来,因此调用cons.poll ()保持活力)

问题是我的处理逻辑需要时间来处理每条记录.在启动Cons-2时,两者都开始在相同的分区上工作,因为Cons-1仍未进行重新平衡(即尚未发生cons.poll()).

增加消费者以便他们可以重新平衡他们自己,cons.poll()将不会发生,除非处理所有记录.

我可能不会去'session.timeout.ms',因为开始新的消费者也可能开始使用与Cons-1相同的分区.

我尝试使用以下方法设置属性:

props.put("max.poll.records",1);
props.put("max.poll.records","1");
Run Code Online (Sandbox Code Playgroud)

但都没有改变.来自民意调查的记录.

我正在使用Apache Kafka 9和Below API.

<dependency>
    <groupId>org.apache.servicemix.bundles</groupId>
    <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
    <version>0.9.0.1_1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

4
推荐指数
1
解决办法
1万
查看次数

关闭后如何重新连接kafka生产者?

我有一个多线程应用程序,它使用生产者类来生成消息,早些时候我使用下面的代码为每个请求创建生产者。其中 KafkaProducer 是用每个请求新建的,如下所示:

KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);

ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        isValidMsg[0] = false;
                        exception.printStackTrace();
                        saveOrUpdateLog(msgBean, producerType, exception);
                        logger.error("ERROR:Unable to produce message.",exception);
                    }
                }
            });
producer.close();
Run Code Online (Sandbox Code Playgroud)

然后我阅读了关于生产者的 Kafka 文档,并知道我们应该使用单个生产者实例来获得良好的性能。

然后我在单例类中创建了 KafkaProducer 的单个实例。

现在我们应该在何时何地关闭生产者。显然,如果我们在第一次发送请求后关闭生产者,它不会找到生产者重新发送消息,因此抛出:

java.lang.IllegalStateException: Cannot send after the producer is closed.
Run Code Online (Sandbox Code Playgroud)

或者我们如何在关闭后重新连接到生产者。问题是如果程序崩溃或有异常呢?

java multithreading scala apache-kafka

4
推荐指数
1
解决办法
1万
查看次数

我可以在karaf(任何特定文件夹)中放置第三方jar来解决传递依赖吗?

我有各种自制项目,具有第三方库依赖项.我将它们捆绑为OSGI容器但无法解决我的项目中的深度依赖项.现在我正在寻找karaf文件夹,我可以放置我的库,所以捆绑可以直接访问它们而不是安装它们.

更多我正在使用maven.

编辑:在遵循你的'功能'解决方案后,我能够产生明显的传递依赖性,但问题是现在它也寻找非常常见的java文件:例如:下面是相当大的依赖列表:

bsh -- Cannot be resolved
com.fasterxml.jackson.annotation -- Cannot be resolved
com.fasterxml.jackson.core -- Cannot be resolved
com.fasterxml.jackson.databind -- Cannot be resolved
com.fasterxml.jackson.databind.annotation -- Cannot be resolved
com.fasterxml.jackson.databind.module -- Cannot be resolved
com.gargoylesoftware.htmlunit -- Cannot be resolved
com.gargoylesoftware.htmlunit.util -- Cannot be resolved
com.google.protobuf -- Cannot be resolved
com.ibm.uvm.tools -- Cannot be resolved
com.ibm.websphere.uow -- Cannot be resolved
com.ibm.wsspi.uow -- Cannot be resolved
com.jamonapi -- Cannot be resolved
com.jamonapi.utils -- Cannot be resolved
com.jayway.jsonpath -- Cannot be resolved
com.jcraft.jzlib -- …
Run Code Online (Sandbox Code Playgroud)

java packaging maven apache-karaf karaf

3
推荐指数
1
解决办法
2582
查看次数

如何检查某个消费者是否使用java连接到Kafka 0.9.0.x?

如何获取kafka上的连接消费者列表?由于消费者在代理上连接,是否有像ZkClient/ZkUtils这样的java实用程序来获取Kafka 0.9.0.x中连接消费者的列表?就像我们使用以下实用程序获取经纪人列表一样:

        ZkClient zkClient = new ZkClient(endpoint.getZookeeperConnect(), 60000);

        if(zkClient!=null){
            List<String> brokerIds = zkClient.getChildren(ZkUtils.BrokerIdsPath());
            if(CollectionUtils.isNotEmpty(brokerIds) &&  brokerIds.contains(brokerId)){
                logger.debug("Broker:{{}} is connected to Zookeeper.",brokerId);
                flag = true;    
            }
            else{
                logger.error("ERROR:Broker:{{}} is not connected to Zookeeper.",brokerId);
            }
            zkClient.close();
        }
Run Code Online (Sandbox Code Playgroud)

我正在使用Kafka 0.9.0.x和maven下面的java lib:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

更新:

我打开了一个' kafka-console-consumer.bat '并运行了一次然后越过了cmd提示符.然后继续" zookeeper-shell.bat "和ls/consumer然后显示[console-consumer-6008],但我的编程消费者没有显示.使用zkClient.getChildren(ZkUtils.ConsumersPath())我现在可以只查看提到的消费者.

java apache-kafka apache-zookeeper

3
推荐指数
1
解决办法
6265
查看次数

如何从目录中获取特定数量的文件?

我想根据我在properties文件中提供的设置检索文件.例如,我只想在第一次迭代中获得50个文件并停止获取所有文件夹中可能存在数千个文件.

我怎么能随机获得50个文件并且没有获得所有列表或迭代文件以获得50?

filesList = folder.listFiles( new FileFilter() {                
    @Override
    public boolean accept(File name) {                      
        return (name.isFile() && ( name.getName().contains("key1")));
    }
});
Run Code Online (Sandbox Code Playgroud)

编辑:我删除了for声明.即使我只提供了一个文件夹来获取它将获取所有文件,计数器变量仍然循环文件夹中的所有文件不是一个好的解决方案.

java file filefilter java-7

1
推荐指数
1
解决办法
164
查看次数