什么是kafka(版本0.10)使用者的默认行为,如果它尝试重新加入使用者组.我正在为一个消费者群体使用单个消费者,但看起来它重新加入了.每10分钟后,它会在消费者日志中打印以下行.
2016-08-11 13:54:53,803 INFO oakcciConsumerCoordinator [pool-5-thread-1]****撤销之前为group image-consumer-group分配的分区****[]
2016-08-11 13:54:53,803 INFO oakcciAbstractCoordinator [pool-5-thread-1] (重新)加入群组形象 - 消费群体
2016-08-11 14:04:53,992 INFO oakcciAbstractCoordinator [pool-5-thread-1]将协调员标记为群组图像 - 消费者群体
2016-08-11 14:04:54,095 INFO oakcciAbstractCoordinator [pool-5-thread-1]发现了group image-consumer-group的协调员.
2016-08-11 14:04:54,096 INFO oakcciAbstractCoordinator [pool-5-thread-1] (Re-)加入group image-consumer-group
重启消费者应用程序没有帮助.
我想对任何不成功的响应都有failfast行为,如果一切都成功,那么我将返回上一个成功的响应,如下面的代码所示.
for(int i=0;i<input.size(); i++){
Data data = service.getData(input.get(i));
if(data.isSuccessful() && i==input.size()-1){
return data;
}else if(!data.isSuccessful()){
return data;
}else{
return null;
}
}
Run Code Online (Sandbox Code Playgroud)
我试图用流替换上面提到的代码,但是到目前为止还没能做到.主要问题是我无法模仿java8流代码中的i(索引)变量行为.
resp = input.stream().map((input)->{service.getData()}).filter(
(resp)->{
if(!resp.isSuccessful())
return true;
else if(resp.isSuccessful() && last resp)//if somehow I figure out last element
return true;
else
return false;}).findFirst();
Run Code Online (Sandbox Code Playgroud) 我想通过CloseableHttpResponse
基于 url返回不同的对象来模拟行为。因为URL1
我想给出302
回应,因为url2
我想给出200
确定的回应。此测试下的方法将url
作为输入并在HttpGet
内部创建一个请求对象并使用httpresponse
对象执行某些操作。但我无法匹配这个HttpGet
论点。有什么办法可以测试这个方法。PShttpClient
也是一个模拟对象。以下代码不起作用,因为期望无法模拟 new HttpGet(Url)
。
CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class);
when(httpClient.execute(new HttpGet(URL1))).thenReturn(httpResponse);
when(httpResponse.getStatusLine()).thenReturn(new BasicStatusLine(new ProtocolVersion("1.1",0,0),HttpStatus.SC_MOVED_PERMANENTLY,""));
when(httpResponse.getHeaders(HttpHeaders.LOCATION)).thenReturn( new Header[]{new BasicHeader(HttpHeaders.LOCATION, URL2)});
CloseableHttpResponse httpResponse1 = mock(CloseableHttpResponse.class);
when(httpClient.execute(new HttpGet(URL2))).thenReturn(httpResponse1);
when(httpResponse.getStatusLine()).thenReturn(new BasicStatusLine(new ProtocolVersion("1.1",0,0),HttpStatus.SC_OK,""));
when(httpResponse.getHeaders(HttpHeaders.CONTENT_LENGTH)).thenReturn( new Header[]{new BasicHeader(HttpHeaders.CONTENT_LENGTH, "0")});
Run Code Online (Sandbox Code Playgroud)
提前致谢。
下面的代码打印:
should Not have called
hello world
Run Code Online (Sandbox Code Playgroud)
为什么orElse
即使有包含值的Optional也要执行?
public static void main(String[] args){
String o = Optional.ofNullable("world").map(str -> "hello" + str).orElse(dontCallMe());
System.out.println(o);
}
private static String dontCallMe() {
System.out.println("should Not have called");
return "why god why";
}
Run Code Online (Sandbox Code Playgroud) java ×2
java-8 ×2
apache-kafka ×1
java-stream ×1
lambda ×1
mockito ×1
optional ×1
unit-testing ×1