小编use*_*468的帖子

适用于Kafka 0.10.0.0及更高版本的session.timeout.ms和max.poll.interval.ms之间的差异

我不清楚为什么我们需要session.timeout.ms和max.poll.interval.ms以及何时使用一个或另一个或两者?似乎两者都表明时间协调器的上限将等待消费者获得心跳,然后再将其假死.

另外,对于基于KIP-62的 0.10.1.0+版本,它的表现如何?

apache-kafka kafka-consumer-api

36
推荐指数
1
解决办法
3万
查看次数

执行者中的Thread.join()等价物

我有一个新手问题.我有这个代码:

public class Main 
{

    public static void main(String[] args) throws InterruptedException 
    {
        // TODO Auto-generated method stub
        IntHolder aHolder=new IntHolder();
        aHolder.Number=0;

        IncrementorThread A= new IncrementorThread(1, aHolder);
        IncrementorThread B= new IncrementorThread(2, aHolder);
        IncrementorThread C= new IncrementorThread(3, aHolder);

        A.start();
        B.start();
        C.start();

        A.join();
        B.join();
        C.join();
        System.out.println("All threads completed...");

    }

}
Run Code Online (Sandbox Code Playgroud)

这将等待所有线程完成.如果我这样使用Executors:

public class Main 
{

    public static void main(String[] args) 
    {
        // TODO Auto-generated method stub
        IntHolder aHolder=new IntHolder();
        aHolder.number=0;

        IncrementalRunable A= new IncrementalRunable(1, aHolder);
        IncrementalRunable B= new IncrementalRunable(2, aHolder); …
Run Code Online (Sandbox Code Playgroud)

java multithreading executorservice

21
推荐指数
2
解决办法
2万
查看次数

使用visualvm和JMX进行远程监控

我想用jvisualvm(或jconsole)监视远程运行的java(spring boot)应用程序.在本地运行时,我可以在jvisualvm和jconsole中看到托管bean.远程运行时我无法连接.我尝试了几个不同的java进程(例如使用spring xd).在SO和Google上寻找答案并没有帮助.

这些是我的JAVA_OPTS(在远程主机上):

$ echo $JAVA_OPTS
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9010 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=192.168.59.99
Run Code Online (Sandbox Code Playgroud)

然后我按如下方式启动程序(这是针对spring xd,但我遇到了与其他java程序相同的问题).

$ bin/xd/xd-singlenode
Run Code Online (Sandbox Code Playgroud)

服务器进程似乎选择了以下选项:

$ ps -ef | grep single
vagrant  22938 19917 99 06:38 pts/2    00:00:03 /usr/lib/jvm/java-8- oracle/jre/bin/java -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9010 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=192.168.59.99 -Dspring.application.name=admin -Dlogging.config=file:/home/vagrant/spring-xd-1.1.0.RELEASE/xd/config///xd-singlenode-logger.properties -Dxd.home=/home/vagrant/spring-xd-1.1.0.RELEASE/xd -Dspring.config.location=file:/home/vagrant/spring-xd-1.1.0.RELEASE/xd/config// -Dxd.config.home=file:/home/vagrant/spring-xd-1.1.0.RELEASE/xd/config// -Dspring.config.name=servers,application -Dxd.module.config.location=file:/home/vagrant/spring-xd-1.1.0.RELEASE/xd/config//modules/ -Dxd.module.config.name=modules -classpath (...)
Run Code Online (Sandbox Code Playgroud)

远程主机(ubuntu linux vm)上的java版本是:

$ java -version
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
Run Code Online (Sandbox Code Playgroud)

本地计算机(Mac OS)上的Java版本略有不同:

$ java -version    
java version "1.8.0_40"
Java(TM) …
Run Code Online (Sandbox Code Playgroud)

java monitoring jmx spring-boot

18
推荐指数
2
解决办法
3万
查看次数

集群中有多少个Kafka控制器,控制器的用途是什么?

Kafka集群中的Kafka控制器负责管理分区负责人和复制.

如果Kafka集群中有100个经纪人,那么控制器只是一个Kafka经纪人吗?那么在100个经纪人中,控制者是领导者吗?

你怎么知道哪个经纪人是控制人?

Kafka控制器的管理对Kafka系统管理至关重要吗?

apache-kafka

13
推荐指数
3
解决办法
8214
查看次数

使用 Kafka SASL/PLAIN 身份验证的 Spark 结构化流

有没有办法将 Spark 结构化流作业连接到受 SASL/PLAIN 身份验证保护的 Kafka 集群?

我在想类似的事情:

val df2 = spark.read.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_PLAINTEXT")
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=...")
    .option("subscribe", "topic1")
    .load();
Run Code Online (Sandbox Code Playgroud)

看起来,虽然 Spark Structured Streaming 可以识别该kafka.bootstrap.servers选项,但它无法识别其他与 SASL 相关的选项。有不同的方法吗?

apache-kafka apache-spark spark-structured-streaming

11
推荐指数
1
解决办法
9282
查看次数

为什么Google Guava Preconditions的checkArgument没有返回值?

我真的很喜欢guava库如何允许简单的单行检查null:

public void methodWithNullCheck(String couldBeNull) {
    String definitelyNotNull = checkNotNull(couldBeNull);
    //...
}
Run Code Online (Sandbox Code Playgroud)

遗憾的是,对于简单的参数检查,您至少需要两行代码:

public void methodWithArgCheck(String couldBeEmpty) {
    checkArgument(!couldBeEmpty.isEmpty());
    String definitelyNotEmpty = couldBeEmpty;
    //...
}
Run Code Online (Sandbox Code Playgroud)

但它是可以添加方法,它可以做参数校验,如果校验成功返回一个值.以下是检查的示例以及如何实施:

public void methodWithEnhancedArgCheck(String couldBeEmpty) {
    String definitelyNotEmpty = EnhancedPreconditions.checkArgument(couldBeEmpty, !couldBeEmpty.isEmpty());
    //...
}

static class EnhancedPreconditions {
    public static <T> T checkArgument(T reference, boolean expression) {
        if (!expression) {
            throw new IllegalArgumentException();
        }

        return reference;
    }
}
Run Code Online (Sandbox Code Playgroud)

我只是想知道是通过设计,是否值得为此提出功能请求.

编辑:@Nizet,是的,检查方法可能是笨拙的.但是,对构造函数中的null进行检查看起来非常好,并且节省了调试NPE所花费的大量时间:

public class SomeClassWithDependency {

    private final SomeDependency someDependency;

    public SomeClassWithDependency(SomeDependency someDependency) {
        this.someDependency …
Run Code Online (Sandbox Code Playgroud)

java argument-passing guava

6
推荐指数
2
解决办法
6214
查看次数

通过JDBC连接到远程Mapr Hive

这个问题类似,但不一样,因为Hive JDBC getConnection没有返回.然而,这是一个远程连接.此时,Metastore存在于启动hiveserver2的目录中.

我们在远程计算机上有一个正在运行的映射器集群.我想使用Java JDBC连接到此集群上的Hive.

因此我们启动了配置单元服务器:

/opt/mapr/hive/hive-0.11/bin/hiveserver2

服务器进程的输出不包含任何错误消息.它监听netstat报告的端口10000.

我尝试按照https://cwiki.apache.org/confluence/display/Hive/HiveClient中的描述连接到服务器,从而用运行hiveserver2的服务器名称替换localhost:

Connection con = 
  DriverManager.getConnection("jdbc:hive://myserver.example.com:10000/default", "", "");
Run Code Online (Sandbox Code Playgroud)

然而,该计划完全依赖于这一声明.它似乎没有连接.

可能我需要提供用户名和密码?

最初我使用过驱动程序org.apache.hadoop.hive.jdbc.HiveDriver.

但是,如果hive2服务器正在运行,我似乎应该使用驱动程序org.apache.hive.jdbc.HiveDriver.现在我得到以下例外:

Exception in thread "main" java.sql.SQLException: Could not establish connection to jdbc:hive2://myserver.example.com:10000/default: Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null)
at org.apache.hive.jdbc.HiveConnection.openSession(HiveConnection.java:246)
at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:132)
at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105)
at java.sql.DriverManager.getConnection(DriverManager.java:579)
at java.sql.DriverManager.getConnection(DriverManager.java:221)
at HiveJdbcClient.main(HiveJdbcClient.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: org.apache.thrift.TApplicationException: Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null)
at org.apache.thrift.TApplicationException.read(TApplicationException.java:108)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71)
at org.apache.hive.service.cli.thrift.TCLIService$Client.recv_OpenSession(TCLIService.java:144)
at org.apache.hive.service.cli.thrift.TCLIService$Client.OpenSession(TCLIService.java:131) …
Run Code Online (Sandbox Code Playgroud)

java hadoop hive jdbc mapr

6
推荐指数
1
解决办法
2万
查看次数

Spring Boot,Spring MVC JSON RequestBody:忽略未知属性

我们正在开发一个JSON Web服务,通​​过@RequestBody注释接收数据.如果请求中包含的属性与反序列化的bean不匹配,我们期望HTTP 400(错误请求)响应,但是简单地忽略该属性.这是一个例子:

@RestController
@Slf4j
public class TestController {

  @RequestMapping(method = RequestMethod.POST, value = "/query")
  public void parse(@RequestBody Query query) {
    log.info("Received query: {}", query.toString());
  }
}


@Data
class Query {
  private String from;
  private String to;
}
Run Code Online (Sandbox Code Playgroud)

发布时

{ "from" : "123", "to": "456", "foo" : "bar" }
Run Code Online (Sandbox Code Playgroud)

我们得到HTTP 200响应.在这种情况下,我们如何让Spring MVC返回HTTP 400?

任何帮助或指针都非常感谢.

请注意,这与此问题不同:如果RequestBody参数的某些属性为null,如何返回400 HTTP错误代码?.

由于该问题询问如何在缺少预期财产时返回400.

java spring json jackson spring-boot

6
推荐指数
2
解决办法
7728
查看次数

滑动窗口中的Kafka KStream相关消息事件

我认为Kafka Streams可以提供帮助,但是我找不到有帮助的文档/示例.

我找到了一个类似的问题,但它没有任何实现建议(我目前丢失的地方):Kafka Streams等待依赖对象的功能

我想做的事:

我想将Kafka主题中的相关记录关联到单个对象中并生成该新对象.例如,可能有5个消息记录通过唯一键相互关联 - 我想从这些相关对象构建一个新对象,并将其生成到新队列.

我希望所有相关事件都能在一小时内消耗掉.卡夫卡将其描述为滑动窗口.一旦ID为"123"的消息记录A到达消费者,应用程序必须至少等待一小时才能到达ID为"123"的剩余记录.在所有记录到达或一小时后,它们都是过期记录.

最后,一小时内收集的所有相关消息都用于创建新的Object,然后发送到另一个Kafka队列.

我遇到的问题.

Kafka中的滑动窗口似乎只有在将两个流连接在一起时才能工作.我们只有一个流连接到主题 - 我不知道为什么需要两个流或我们将如何实现这一点.我在网上找不到这个例子.我在Kafka中看到的所有流函数在收集相同密钥的事件时简单地聚合/缩减为简单值.例如,键出现的次数或累加某些值

这里有一些伪代码来描述我在说什么.如果功能存在,函数名称/语义将会不同.

    KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
    kstream.windowedBy(
    // One hour sliding Window
    )
    .collectAllRelatedKeys(
    // Collect all Records related to each key
    // map == HashMap<Key, ArrayList<Value>>
       map.get(key).add(value);
    )
    .transformAndProcess(
        if(ALL_EVENTS_COLLECTED) {
        // Create new Object from all related records
            newObject = 
            createNewObjectFromRelatedRecordsFunction(map.get(key));
            producer.send(newObject);   
        }
    )
Run Code Online (Sandbox Code Playgroud)

问题(谢谢你的帮助):

  1. 我怎么能用一个流的滑动窗口?
  2. 如何自定义KStream/KTable函数以收集Window中的所有相关事件并将新对象生成到另一个队列?
  3. acking/offset管理如何与Sliding Window流一起使用?
  4. 这可以保证完全一次交货吗?供参考:https://www.confluent.io/blog/enabling-exactly-kafka-streams/

apache-kafka rocksdb apache-kafka-streams

6
推荐指数
1
解决办法
1294
查看次数

Spring集成拖尾多个文件

我正在编写一个Spring集成应用程序,它应该包含多个文件(可能多达100个).我使用OSDelegatingFileTailingMessageProducer作为消息源,这是涉及多个过滤器和通道的管道的开始.

Tailing一个文件可以正常使用这个管道和一个用于通道和转换器的XML配置文件,但拖尾许多这些文件意味着这个XML配置的倍增,这在我看来并不是很好的编程实践.

我想我将不得不通过编程构建Spring应用程序上下文在Java中构建这些管道.还有其他选择吗?

编辑:

可能需要使用BeanFactoryPostProcessor:https://stackoverflow.com/a/15773000/2069922

java spring spring-integration

5
推荐指数
1
解决办法
527
查看次数