这是java.util.concurrent.CountedCompleter
类的代码片段(JDK 1.8.0_25).
/**
* If the pending count is nonzero, decrements the count;
* otherwise invokes {@link #onCompletion(CountedCompleter)}
* and then similarly tries to complete this task's completer,
* if one exists, else marks this task as complete.
*/
public final void tryComplete() {
CountedCompleter<?> a = this, s = a;
for (int c;;) {
if ((c = a.pending) == 0) {
a.onCompletion(s);
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
else if …
Run Code Online (Sandbox Code Playgroud) 我们开发了一个使用Java8并行流的API调用,并且我们获得了非常好的性能,与进行压力测试时的顺序处理相比几乎翻了一番.
我知道这取决于用例,但我将它用于加密操作,所以我认为这是一个很好的用例.
但是,我读了很多鼓励他们非常小心的文章.还有文章讨论它们内部设计不是很好,就像这里一样.
因此:准备并行流生产; 它们被广泛用于生产系统吗?
我们在java中有三种不同的多线程技术 - Fork/Join池,Executor Service和CountDownLatch
Fork/Join pool(http://www.javacodegeeks.com/2011/02/java-forkjoin-parallel-programming.html)
Fork/Join框架旨在使分而治之的算法易于并行化.这种类型的算法非常适合于可以分为两个或更多相同类型的子问题的问题.他们使用递归将问题分解为简单的任务,直到这些变得足够简单直接解决.然后组合子问题的解决方案以给出原始问题的解决方案
ExecutorService是一个扩展Executor类并表示异步执行的接口.它为我们提供了管理结束和检测异步任务进度的机制.
invokeAll():执行给定的任务,返回一个Futures列表,其中包含完成后的状态和结果.对于返回列表的每个元素,Future.isDone()都为true.
CountDownLatch :( http://examples.javacodegeeks.com/core-java/util/concurrent/countdownlatch-concurrent/java-util-concurrent-countdownlatch-example/)
CountDownLatch用于同步,以允许一个或多个线程等待,直到在其他线程中执行的一组操作完成.
我的假设:
在这两种替代方案中,只有在完成所有任务/线程后才能知道最终结果.
这三种选择是互补的还是互补的?
java multithreading executorservice countdownlatch forkjoinpool
我想使用java的fork join模型实现bitonic排序.这是分拣机的代码
import java.util.concurrent.RecursiveAction;
public class BitonicSortTask extends RecursiveAction
{
private final int array[];
private final int low;
private final int high;
private final int dir;
public BitonicSortTask(int array[],int low,int high,int dir)
{
this.array = array;
this.low = low;
this.high = high;
this.dir= dir;
}
@Override
protected void compute()
{
if(high>1)
{
int temp = high/2;
BitonicSortTask left = new BitonicSortTask(array, low, temp,1);
BitonicSortTask right = new BitonicSortTask(array, temp+1,high,0);
invokeAll(left, right);
BitonicMerge(array, low, high, dir);
}
}
private void BitonicMerge(int[] …
Run Code Online (Sandbox Code Playgroud) 我有一个fork连接调度程序配置为仅使用akka http的客户端(通过主机连接池)的服务:
my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 256
parallelism-factor = 128.0
parallelism-max = 2048
}
}
Run Code Online (Sandbox Code Playgroud)
服务逻辑唯一要做的就是从外部源请求,使用jawn解组它,然后将jawn ast转换为case类:
def get(uri: Uri)[T]: Future[T] = {
for {
response <- request(uri)
json <- Unmarshal(response.entity).to[Try[JValue]]
} yield json.transformTo[T]
}
Run Code Online (Sandbox Code Playgroud)
我想知道为这种工作负载使用固定线程池是否更有效.这项服务大约需要150 req/s,我想将CPU使用率保持在1 CPU以下(目前它的徘徊在1.25-1.5左右).
从这个链接,我只是部分理解,至少在某些时候,java嵌套并行流存在问题.但是,我无法推断出以下问题的答案:
假设我有一个外部srtream和一个内部流,两者都使用并行流.事实证明,根据我的计算,如果内部流首先完全并行完成,那么它将更高效(由于数据位置,即L1/L2/L3 CPU缓存中的缓存)(如果且仅cpu核心可用)做外部流.我认为这对大多数人的情况都是如此.所以我的问题是:
Java会首先并行执行内部流,然后在outerstream上工作吗?如果是这样,它是在编译时还是在运行时做出决定?如果在运行时,JIT甚至足够聪明地意识到如果内部流确实具有比核心数量(32)更多的元素(例如数百个),那么它肯定应该使用所有32个内核来处理在从外部流移动下一个元素之前的内部流; 但是,如果元素的数量很小(例如<32),则可以"并行处理"来自"下一个"外部流的元素的元素.
我可以通过提供代码片段来解释我的问题:
public static void main(final String[] a) {
Stream.of(1, 2, 3, 4).map(i -> ForkJoinPool.commonPool().submit(new RecursiveAction() {
@Override
protected void compute() {
System.out.println(Thread.currentThread());
}
})).forEach(ForkJoinTask::join);
}
Run Code Online (Sandbox Code Playgroud)
在我的笔记本电脑上运行时,它有4个核心,这打印:
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Run Code Online (Sandbox Code Playgroud)
为什么某些任务在主线程中运行,主线程是公共fork连接线程池之外的线程?
创建自定义fork join线程池时,不会发生这种情况:
public static void main(final String[] a) {
final ForkJoinPool p = new ForkJoinPool(4);
Stream.of(1, 2, 3, 4).map(index -> p.submit(new RecursiveAction() {
@Override
protected void compute() {
System.out.println(Thread.currentThread());
}
})).forEach(ForkJoinTask::join);
}
Thread[ForkJoinPool-1-worker-1,5,main]
Thread[ForkJoinPool-1-worker-1,5,main]
Thread[ForkJoinPool-1-worker-1,5,main]
Thread[ForkJoinPool-1-worker-1,5,main]
Run Code Online (Sandbox Code Playgroud)
那么,换句话说,公共池有什么特别之处?提供这些知识,在公共池中执行长时间运行的任务是明智的还是不明智的想法?
在下面的代码中,无论我将 i 的最大值设置为多少,线程总数都不会超过 13。它使用的是什么线程池?我在哪里可以找到它的默认设置?
public static void main(String[] args) {
// write your code here
for (int i = 0; i <= 5; i++) {
System.out.println("kick off" + i);
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
System.out.println(java.lang.Thread.activeCount());
}
catch (Exception e) {
System.out.println("error");
}
});
}
System.out.println(java.lang.Thread.activeCount());
try {
Thread.sleep(10000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud) forkjoinpool ×8
java ×7
concurrency ×2
fork-join ×2
java-8 ×2
akka ×1
akka-http ×1
java-stream ×1
production ×1
scala ×1