我无法解释这一点,但我在其他人的代码中发现了这种现象:
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.stream.Stream;
import org.junit.Test;
public class TestDidWeBreakJavaAgain
{
@Test
public void testIoInSerialStream()
{
doTest(false);
}
@Test
public void testIoInParallelStream()
{
doTest(true);
}
private void doTest(boolean parallel)
{
Stream<String> stream = Stream.of("1", "2", "3");
if (parallel)
{
stream = stream.parallel();
}
stream.forEach(name -> {
try
{
Files.createTempFile(name, ".dat");
}
catch (IOException e)
{
throw new UncheckedIOException("Failed to create temp file", e);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
在启用安全管理器的情况下运行时,仅仅调用parallel()流,或者parallelStream()从集合中获取流时,似乎可以保证所有执行I/O的尝试都会抛出SecurityException.(最有可能的,它调用任何方法可以抛出 …
我有一个类似这样的程序
public class Test implements Runnable
{
public int local_counter
public static int global_counter
// Barrier waits for as many threads as we launch + main thread
public static CyclicBarrier thread_barrier = new CyclicBarrier (n_threads + 1);
/* Constructors etc. */
public void run()
{
for (int i=0; i<100; i++)
{
thread_barrier.await();
local_counter = 0;
for(int j=0 ; j = 20 ; j++)
local_counter++;
thread_barrier.await();
}
}
public void main()
{
/* Create and launch some threads, stored on thread_array …Run Code Online (Sandbox Code Playgroud) java parallel-processing concurrency synchronization java.util.concurrent
我想全局替换Java并行流默认使用的公共线程池,例如for
IntStream.range(0,100).parallel().forEach(i -> {
doWork();
});
Run Code Online (Sandbox Code Playgroud)
我知道可以通过向专用线程池提交此类指令来使用专用的ForkJoinPool(请参阅Java 8并行流中的自定义线程池).这里的问题是
Executors.newFixedThreadPool(10)?备注:我之所以要更换F/J池,是因为它似乎有一个错误,使其无法用于嵌套并行循环.
嵌套并行循环的性能很差,可能导致死锁,请参阅http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html
例如:以下代码导致死锁:
// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {
// (omitted:) do some heavy work here (consuming majority of time)
// Need to synchronize for a small "subtask" (e.g. updating a result)
synchronized(this) {
// Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
IntStream.range(0,100).parallel().forEach(j -> {
// do work here
});
}
});
Run Code Online (Sandbox Code Playgroud)
(即使在"在这里工作"没有任何额外的代码,因为并行性设置为<12).
我的问题是如何更换FJP.如果您想讨论嵌套并行循环,您可能会检查嵌套Java 8并行forEach循环执行不佳.这种行为有望吗?.
默认情况下,Java流由公共线程池处理,该线程池使用默认参数构造.正如在另一个问题中所回答的那样,可以通过指定自定义池或通过设置java.util.concurrent.ForkJoinPool.common.parallelism系统参数来调整这些默认值.
但是,我无法通过这两种方法中的任何一种来增加分配给流处理的线程数.例如,请考虑以下程序,该程序处理第一个参数中指定的文件中包含的IP地址列表,并输出已解析的地址.在具有大约13000个唯一IP地址的文件上运行此操作,我看到使用Oracle Java Mission Control只需16个线程.其中只有五名是ForkJoinPool工人.然而,这个特定任务将从更多线程中受益,因为线程大部分时间都在等待DNS响应.所以我的问题是,我怎样才能真正增加使用的线程数?
我在三种环境下尝试过该程序; 这些是操作系统报告的线程数.
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;
/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
/** Resolve the passed IP address into a name */
static String addressName(String ipAddress) {
try {
return …Run Code Online (Sandbox Code Playgroud) 我编写了一个使用Java流的方法,它简单地遍历对象列表并返回true/false,满足某些条件
Java方法:
boolean method(SampleObj sampleObj) {
List testList = invokeSomeMethod();
int result = testList
.parallelStream()
.filter(listObj -> (listObj.getAttr() = 1))
.count(listObj -> listObj.isAttr4());
return (result > 10);
}
Run Code Online (Sandbox Code Playgroud)
我也写了一个模拟测试用例.当我执行测试用例时,测试成功,但是我得到项目自定义错误,指出所有创建的线程都没有关闭.
我甚至尝试使用带有try-with-resources的流,而noo没有帮助.
模拟测试:
@Test
public void testSomeMethod() {
SampleObj sampleObj1 = new SampleObj(10, 20, 30, true);
SampleObj sampleObj2 = new SampleObj(10, 20, 30, true);
SampleObj sampleObj3 = new SampleObj(10, 20, 30, false);
SampleObj sampleObjTest = new SampleObj(10, 20, 30, true);
List<SampleObj> testList = new ArrayList<SampleObj>();
testList.add(sampleObj1);
testList.add(sampleObj2);
testList.add(sampleObj3);
when(mockedAttribute.invokeSomeMethod()).thenReturn(nodeList);
ClassToBeTested classTest = …Run Code Online (Sandbox Code Playgroud) 我通过流式传输对象列表来调用异步客户端方法.该方法返回Future.
迭代调用后返回的Futures列表的最佳方法是什么(以便处理那些首先出现的Future)?
注意:异步客户端仅返回Future not CompletableFuture.
以下是代码:
List<Future<Object>> listOfFuture = objectsToProcess.parallelStream()
.map((object) -> {
/* calling an async client returning a Future<Object> */ })
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud) 我是java的新手,我想使用执行器服务或使用java中的任何其他方法并行化嵌套for循环.我想创建一些固定数量的线程,以便线程不会完全获取CPU.
for(SellerNames sellerNames : sellerDataList) {
for(String selleName : sellerNames) {
//getSellerAddress(sellerName)
//parallize this task
}
}
Run Code Online (Sandbox Code Playgroud)
sellerDataList = 1000的大小和sellerNames = 5000的大小.
现在我想创建10个线程并将相同的任务块分配给每个线程.这是为了我的sellerDataList,第一个线程应该获得500个名称的地址,第二个线程应该获得下一个500个名称的地址,依此类推.
做这份工作的最佳方法是什么?
我正在使用CompletableFuture来异步执行从列表源生成的流.
所以我正在测试重载方法,即CompletableFuture的"supplyAsync",其中一个方法只接受单个供应商参数,另一个方法接受供应商参数和执行者参数.以下是两者的文档:
一
supplyAsync(供应商供应商)
返回由ForkJoinPool.commonPool()中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.
第二
supplyAsync(供应商供应商,执行执行人)
返回由给定执行程序中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.
这是我的测试类:
public class TestCompleteableAndParallelStream {
public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());
useCompletableFuture(tasks);
useCompletableFutureWithExecutor(tasks);
}
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); …Run Code Online (Sandbox Code Playgroud) executorservice java-8 threadpoolexecutor forkjoinpool completable-future
当我尝试从并行流中的安全上下文获取主体时,如果主体不在主线程中,它总是返回 null。
当用户通过身份验证时,以下代码段失败:
listOfSomething.parallelStream()
.foreach(el -> {
if (SecurityContextHolder.getContext().getAuthentication().getPrincipal() == null){
throw new RuntimeException();
}});
Run Code Online (Sandbox Code Playgroud)
文档说:
定义与当前执行线程关联的最小安全信息的接口。
但是,有什么办法可以做到吗?它在主线程中启动并使用 ForkJoinPool
谢谢你!
这是设计和代码问题的很大结合。
用例
- 给定范围内的许多日志文件(2MB - 2GB),我需要解析每个日志并应用一些处理,生成 Java POJO.
- 对于这个问题,我们假设我们只有1日志文件
- 另外,这个想法是充分利用 System. 可以使用多个核心。
替代方案 1
- 打开文件(同步),读取每一行,生成POJOs
FileActor -> read each line -> List<POJO>
Run Code Online (Sandbox Code Playgroud)
优点:简单易懂
缺点:串行过程,没有利用系统中的多个核心
替代方案 2
- 打开文件(同步),读取N行(N可配置),传递给不同的参与者进行处理
/ LogLineProcessActor 1
FileActor -> LogLineProcessRouter (with 10 Actors) -- LogLineProcessActor 2
\ LogLineProcessActor 10
Run Code Online (Sandbox Code Playgroud)
优点一些并行化,通过使用不同的参与者来处理部分线路。参与者将利用系统中的可用核心(?如何,可能?)
缺点仍然是串行的,因为文件以串行方式读取
问题
- 以上选择是一个不错的选择吗?
- 有更好的选择吗?
请在此提供宝贵的想法
多谢
java ×9
concurrency ×3
java-8 ×3
java-stream ×3
forkjoinpool ×2
stream ×2
akka ×1
file-io ×1
future ×1
spring ×1
spring-boot ×1
threadpool ×1
typesafe ×1
unit-testing ×1