我不清楚为什么我们需要session.timeout.ms和max.poll.interval.ms以及何时使用一个或另一个或两者?似乎两者都表明时间协调器的上限将等待消费者获得心跳,然后再将其假死.
另外,对于基于KIP-62的 0.10.1.0+版本,它的表现如何?
我有一个新手问题.我有这个代码:
public class Main
{
public static void main(String[] args) throws InterruptedException
{
// TODO Auto-generated method stub
IntHolder aHolder=new IntHolder();
aHolder.Number=0;
IncrementorThread A= new IncrementorThread(1, aHolder);
IncrementorThread B= new IncrementorThread(2, aHolder);
IncrementorThread C= new IncrementorThread(3, aHolder);
A.start();
B.start();
C.start();
A.join();
B.join();
C.join();
System.out.println("All threads completed...");
}
}
Run Code Online (Sandbox Code Playgroud)
这将等待所有线程完成.如果我这样使用Executors:
public class Main
{
public static void main(String[] args)
{
// TODO Auto-generated method stub
IntHolder aHolder=new IntHolder();
aHolder.number=0;
IncrementalRunable A= new IncrementalRunable(1, aHolder);
IncrementalRunable B= new IncrementalRunable(2, aHolder); …Run Code Online (Sandbox Code Playgroud) 我想用jvisualvm(或jconsole)监视远程运行的java(spring boot)应用程序.在本地运行时,我可以在jvisualvm和jconsole中看到托管bean.远程运行时我无法连接.我尝试了几个不同的java进程(例如使用spring xd).在SO和Google上寻找答案并没有帮助.
这些是我的JAVA_OPTS(在远程主机上):
$ echo $JAVA_OPTS
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9010 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=192.168.59.99
Run Code Online (Sandbox Code Playgroud)
然后我按如下方式启动程序(这是针对spring xd,但我遇到了与其他java程序相同的问题).
$ bin/xd/xd-singlenode
Run Code Online (Sandbox Code Playgroud)
服务器进程似乎选择了以下选项:
$ ps -ef | grep single
vagrant 22938 19917 99 06:38 pts/2 00:00:03 /usr/lib/jvm/java-8- oracle/jre/bin/java -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9010 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=192.168.59.99 -Dspring.application.name=admin -Dlogging.config=file:/home/vagrant/spring-xd-1.1.0.RELEASE/xd/config///xd-singlenode-logger.properties -Dxd.home=/home/vagrant/spring-xd-1.1.0.RELEASE/xd -Dspring.config.location=file:/home/vagrant/spring-xd-1.1.0.RELEASE/xd/config// -Dxd.config.home=file:/home/vagrant/spring-xd-1.1.0.RELEASE/xd/config// -Dspring.config.name=servers,application -Dxd.module.config.location=file:/home/vagrant/spring-xd-1.1.0.RELEASE/xd/config//modules/ -Dxd.module.config.name=modules -classpath (...)
Run Code Online (Sandbox Code Playgroud)
远程主机(ubuntu linux vm)上的java版本是:
$ java -version
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
Run Code Online (Sandbox Code Playgroud)
本地计算机(Mac OS)上的Java版本略有不同:
$ java -version
java version "1.8.0_40"
Java(TM) …Run Code Online (Sandbox Code Playgroud) Kafka集群中的Kafka控制器负责管理分区负责人和复制.
如果Kafka集群中有100个经纪人,那么控制器只是一个Kafka经纪人吗?那么在100个经纪人中,控制者是领导者吗?
你怎么知道哪个经纪人是控制人?
Kafka控制器的管理对Kafka系统管理至关重要吗?
有没有办法将 Spark 结构化流作业连接到受 SASL/PLAIN 身份验证保护的 Kafka 集群?
我在想类似的事情:
val df2 = spark.read.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=...")
.option("subscribe", "topic1")
.load();
Run Code Online (Sandbox Code Playgroud)
看起来,虽然 Spark Structured Streaming 可以识别该kafka.bootstrap.servers选项,但它无法识别其他与 SASL 相关的选项。有不同的方法吗?
我真的很喜欢guava库如何允许简单的单行检查null:
public void methodWithNullCheck(String couldBeNull) {
String definitelyNotNull = checkNotNull(couldBeNull);
//...
}
Run Code Online (Sandbox Code Playgroud)
遗憾的是,对于简单的参数检查,您至少需要两行代码:
public void methodWithArgCheck(String couldBeEmpty) {
checkArgument(!couldBeEmpty.isEmpty());
String definitelyNotEmpty = couldBeEmpty;
//...
}
Run Code Online (Sandbox Code Playgroud)
但它是可以添加方法,它可以做参数校验和,如果校验成功返回一个值.以下是检查的示例以及如何实施:
public void methodWithEnhancedArgCheck(String couldBeEmpty) {
String definitelyNotEmpty = EnhancedPreconditions.checkArgument(couldBeEmpty, !couldBeEmpty.isEmpty());
//...
}
static class EnhancedPreconditions {
public static <T> T checkArgument(T reference, boolean expression) {
if (!expression) {
throw new IllegalArgumentException();
}
return reference;
}
}
Run Code Online (Sandbox Code Playgroud)
我只是想知道是通过设计,是否值得为此提出功能请求.
编辑:@Nizet,是的,检查方法可能是笨拙的.但是,对构造函数中的null进行检查看起来非常好,并且节省了调试NPE所花费的大量时间:
public class SomeClassWithDependency {
private final SomeDependency someDependency;
public SomeClassWithDependency(SomeDependency someDependency) {
this.someDependency …Run Code Online (Sandbox Code Playgroud) 这个问题类似,但不一样,因为Hive JDBC getConnection没有返回.然而,这是一个远程连接.此时,Metastore存在于启动hiveserver2的目录中.
我们在远程计算机上有一个正在运行的映射器集群.我想使用Java JDBC连接到此集群上的Hive.
因此我们启动了配置单元服务器:
/opt/mapr/hive/hive-0.11/bin/hiveserver2
服务器进程的输出不包含任何错误消息.它监听netstat报告的端口10000.
我尝试按照https://cwiki.apache.org/confluence/display/Hive/HiveClient中的描述连接到服务器,从而用运行hiveserver2的服务器名称替换localhost:
Connection con =
DriverManager.getConnection("jdbc:hive://myserver.example.com:10000/default", "", "");
Run Code Online (Sandbox Code Playgroud)
然而,该计划完全依赖于这一声明.它似乎没有连接.
可能我需要提供用户名和密码?
最初我使用过驱动程序org.apache.hadoop.hive.jdbc.HiveDriver.
但是,如果hive2服务器正在运行,我似乎应该使用驱动程序org.apache.hive.jdbc.HiveDriver.现在我得到以下例外:
Exception in thread "main" java.sql.SQLException: Could not establish connection to jdbc:hive2://myserver.example.com:10000/default: Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null)
at org.apache.hive.jdbc.HiveConnection.openSession(HiveConnection.java:246)
at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:132)
at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105)
at java.sql.DriverManager.getConnection(DriverManager.java:579)
at java.sql.DriverManager.getConnection(DriverManager.java:221)
at HiveJdbcClient.main(HiveJdbcClient.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: org.apache.thrift.TApplicationException: Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null)
at org.apache.thrift.TApplicationException.read(TApplicationException.java:108)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71)
at org.apache.hive.service.cli.thrift.TCLIService$Client.recv_OpenSession(TCLIService.java:144)
at org.apache.hive.service.cli.thrift.TCLIService$Client.OpenSession(TCLIService.java:131) …Run Code Online (Sandbox Code Playgroud) 我们正在开发一个JSON Web服务,通过@RequestBody注释接收数据.如果请求中包含的属性与反序列化的bean不匹配,我们期望HTTP 400(错误请求)响应,但是简单地忽略该属性.这是一个例子:
@RestController
@Slf4j
public class TestController {
@RequestMapping(method = RequestMethod.POST, value = "/query")
public void parse(@RequestBody Query query) {
log.info("Received query: {}", query.toString());
}
}
@Data
class Query {
private String from;
private String to;
}
Run Code Online (Sandbox Code Playgroud)
发布时
{ "from" : "123", "to": "456", "foo" : "bar" }
Run Code Online (Sandbox Code Playgroud)
我们得到HTTP 200响应.在这种情况下,我们如何让Spring MVC返回HTTP 400?
任何帮助或指针都非常感谢.
请注意,这与此问题不同:如果RequestBody参数的某些属性为null,如何返回400 HTTP错误代码?.
由于该问题询问如何在缺少预期财产时返回400.
我认为Kafka Streams可以提供帮助,但是我找不到有帮助的文档/示例.
我找到了一个类似的问题,但它没有任何实现建议(我目前丢失的地方):Kafka Streams等待依赖对象的功能
我想做的事:
我想将Kafka主题中的相关记录关联到单个对象中并生成该新对象.例如,可能有5个消息记录通过唯一键相互关联 - 我想从这些相关对象构建一个新对象,并将其生成到新队列.
我希望所有相关事件都能在一小时内消耗掉.卡夫卡将其描述为滑动窗口.一旦ID为"123"的消息记录A到达消费者,应用程序必须至少等待一小时才能到达ID为"123"的剩余记录.在所有记录到达或一小时后,它们都是过期记录.
最后,一小时内收集的所有相关消息都用于创建新的Object,然后发送到另一个Kafka队列.
我遇到的问题.
Kafka中的滑动窗口似乎只有在将两个流连接在一起时才能工作.我们只有一个流连接到主题 - 我不知道为什么需要两个流或我们将如何实现这一点.我在网上找不到这个例子.我在Kafka中看到的所有流函数在收集相同密钥的事件时简单地聚合/缩减为简单值.例如,键出现的次数或累加某些值
这里有一些伪代码来描述我在说什么.如果功能存在,函数名称/语义将会不同.
KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
kstream.windowedBy(
// One hour sliding Window
)
.collectAllRelatedKeys(
// Collect all Records related to each key
// map == HashMap<Key, ArrayList<Value>>
map.get(key).add(value);
)
.transformAndProcess(
if(ALL_EVENTS_COLLECTED) {
// Create new Object from all related records
newObject =
createNewObjectFromRelatedRecordsFunction(map.get(key));
producer.send(newObject);
}
)
Run Code Online (Sandbox Code Playgroud)
问题(谢谢你的帮助):
我正在编写一个Spring集成应用程序,它应该包含多个文件(可能多达100个).我使用OSDelegatingFileTailingMessageProducer作为消息源,这是涉及多个过滤器和通道的管道的开始.
Tailing一个文件可以正常使用这个管道和一个用于通道和转换器的XML配置文件,但拖尾许多这些文件意味着这个XML配置的倍增,这在我看来并不是很好的编程实践.
我想我将不得不通过编程构建Spring应用程序上下文在Java中构建这些管道.还有其他选择吗?
编辑:
可能需要使用BeanFactoryPostProcessor:https://stackoverflow.com/a/15773000/2069922?
java ×6
apache-kafka ×4
spring ×2
spring-boot ×2
apache-spark ×1
guava ×1
hadoop ×1
hive ×1
jackson ×1
jdbc ×1
jmx ×1
json ×1
mapr ×1
monitoring ×1
rocksdb ×1