相关疑难解决方法(0)

SecurityException来自并行流中的I/O代码

我无法解释这一点,但我在其他人的代码中发现了这种现象:

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.stream.Stream;

import org.junit.Test;

public class TestDidWeBreakJavaAgain
{
    @Test
    public void testIoInSerialStream()
    {
        doTest(false);
    }

    @Test
    public void testIoInParallelStream()
    {
        doTest(true);
    }

    private void doTest(boolean parallel)
    {
        Stream<String> stream = Stream.of("1", "2", "3");
        if (parallel)
        {
            stream = stream.parallel();
        }
        stream.forEach(name -> {
            try
            {
                Files.createTempFile(name, ".dat");
            }
            catch (IOException e)
            {
                throw new UncheckedIOException("Failed to create temp file", e);
            }
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

在启用安全管理器的情况下运行时,仅仅调用parallel()流,或者parallelStream()从集合中获取流时,似乎可以保证所有执行I/O的尝试都会抛出SecurityException.(最有可能的,它调用任何方法可以抛出 …

java parallel-processing securitymanager java-8 java-stream

9
推荐指数
1
解决办法
992
查看次数

我怎样才能重写这个主线程 - 工作线程同步

我有一个类似这样的程序

public class Test implements Runnable
{
    public        int local_counter
    public static int global_counter
    // Barrier waits for as many threads as we launch + main thread
    public static CyclicBarrier thread_barrier = new CyclicBarrier (n_threads + 1);

    /* Constructors etc. */

    public void run()
    {
        for (int i=0; i<100; i++)
        {
            thread_barrier.await();
            local_counter = 0;
            for(int j=0 ; j = 20 ; j++)
                local_counter++;
            thread_barrier.await();
        }
    }

    public void main()
    {
        /* Create and launch some threads, stored on thread_array …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing concurrency synchronization java.util.concurrent

8
推荐指数
1
解决办法
359
查看次数

如何(全局)替换Java并行流的公共线程池后端?

我想全局替换Java并行流默认使用的公共线程池,例如for

IntStream.range(0,100).parallel().forEach(i -> {
    doWork();
});
Run Code Online (Sandbox Code Playgroud)

我知道可以通过向专用线程池提交此类指令来使用专用的ForkJoinPool(请参阅Java 8并行流中的自定义线程池).这里的问题是

  • 是否有可能通过其他实现替换常见的ForkJoinPool(比如说Executors.newFixedThreadPool(10)
  • 是否可以通过某些全局设置来实现,例如,某些JVM属性?

备注:我之所以要更换F/J池,是因为它似乎有一个错误,使其无法用于嵌套并行循环.

嵌套并行循环的性能很差,可能导致死锁,请参阅http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

例如:以下代码导致死锁:

// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {

    // (omitted:) do some heavy work here (consuming majority of time)

    // Need to synchronize for a small "subtask" (e.g. updating a result)
    synchronized(this) {
        // Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
        IntStream.range(0,100).parallel().forEach(j -> {
            // do work here
        });
    }
});
Run Code Online (Sandbox Code Playgroud)

(即使在"在这里工作"没有任何额外的代码,因为并行性设置为<12).

我的问题是如何更换FJP.如果您想讨论嵌套并行循环,您可能会检查嵌套Java 8并行forEach循环执行不佳.这种行为有望吗?.

java parallel-processing concurrency multithreading stream

7
推荐指数
1
解决办法
1717
查看次数

如何处理超过默认线程数的Java流?

默认情况下,Java流由公共线程池处理,该线程池使用默认参数构造.正如在另一个问题中所回答的那样,可以通过指定自定义池或通过设置java.util.concurrent.ForkJoinPool.common.parallelism系统参数来调整这些默认值.

但是,我无法通过这两种方法中的任何一种来增加分配给流处理的线程数.例如,请考虑以下程序,该程序处理第一个参数中指定的文件中包含的IP地址列表,并输出已解析的地址.在具有大约13000个唯一IP地址的文件上运行此操作,我看到使用Oracle Java Mission Control只需16个线程.其中只有五名是ForkJoinPool工人.然而,这个特定任务将从更多线程中受益,因为线程大部分时间都在等待DNS响应.所以我的问题是,我怎样才能真正增加使用的线程数?

我在三种环境下尝试过该程序; 这些是操作系统报告的线程数.

  • Java SE Runtime Environment在运行Windows 7:17线程的8核计算机上构建1.8.0_73-b02
  • Java SE运行时环境在运行OS X Darwin 15.2.0的23核机器上构建1.8.0_66-b17:23个线程
  • openjdk版本1.8.0_72在一台运行FreeBSD 11.0的44核机器上:44个线程

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;

/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
    /** Resolve the passed IP address into a name */
    static String addressName(String ipAddress) {
        try {
            return …
Run Code Online (Sandbox Code Playgroud)

java multithreading forkjoinpool java-stream

7
推荐指数
1
解决办法
2023
查看次数

Java Parallel Streams关闭线程

我编写了一个使用Java流的方法,它简单地遍历对象列表并返回true/false,满足某些条件

Java方法:

 boolean method(SampleObj sampleObj) {

   List testList = invokeSomeMethod();
   int result = testList
            .parallelStream()
            .filter(listObj -> (listObj.getAttr() = 1))
            .count(listObj -> listObj.isAttr4());

   return (result > 10);

 }
Run Code Online (Sandbox Code Playgroud)

我也写了一个模拟测试用例.当我执行测试用例时,测试成功,但是我得到项目自定义错误,指出所有创建的线程都没有关闭.

我甚至尝试使用带有try-with-resources的流,而noo没有帮助.

模拟测试:

@Test
public void testSomeMethod() {
    SampleObj sampleObj1 = new SampleObj(10, 20, 30, true);
    SampleObj sampleObj2 = new SampleObj(10, 20, 30, true);
    SampleObj sampleObj3 = new SampleObj(10, 20, 30, false);
    SampleObj sampleObjTest = new SampleObj(10, 20, 30, true);

    List<SampleObj> testList = new ArrayList<SampleObj>();
    testList.add(sampleObj1);
    testList.add(sampleObj2);
    testList.add(sampleObj3);

    when(mockedAttribute.invokeSomeMethod()).thenReturn(nodeList);

    ClassToBeTested classTest = …
Run Code Online (Sandbox Code Playgroud)

java multithreading unit-testing java-stream

7
推荐指数
1
解决办法
703
查看次数

在期货清单上流式传输的最有效方式

我通过流式传输对象列表来调用异步客户端方法.该方法返回Future.

迭代调用后返回的Futures列表的最佳方法是什么(以便处理那些首先出现的Future)?

注意:异步客户端仅返回Future not CompletableFuture.

以下是代码:

List<Future<Object>> listOfFuture = objectsToProcess.parallelStream()
    .map((object) -> {
        /* calling an async client returning a Future<Object> */ })
    .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

java concurrency future java-8 completable-future

6
推荐指数
1
解决办法
4363
查看次数

使用多线程并行化Java中的for循环

我是java的新手,我想使用执行器服务或使用java中的任何其他方法并行化嵌套for循环.我想创建一些固定数量的线程,以便线程不会完全获取CPU.

    for(SellerNames sellerNames : sellerDataList) {
        for(String selleName : sellerNames) {
        //getSellerAddress(sellerName)
        //parallize this task
        }
    }
Run Code Online (Sandbox Code Playgroud)

sellerDataList = 1000的大小和sellerNames = 5000的大小.

现在我想创建10个线程并将相同的任务块分配给每个线程.这是为了我的sellerDataList,第一个线程应该获得500个名称的地址,第二个线程应该获得下一个500个名称的地址,依此类推.
做这份工作的最佳方法是什么?

java parallel-processing multithreading threadpool

6
推荐指数
1
解决办法
1万
查看次数

默认的ForkJoinPool执行器需要很长时间

我正在使用CompletableFuture来异步执行从列表源生成的流.

所以我正在测试重载方法,即CompletableFuture的"supplyAsync",其中一个方法只接受单个供应商参数,另一个方法接受供应商参数和执行者参数.以下是两者的文档:

supplyAsync(供应商供应商)

返回由ForkJoinPool.commonPool()中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.

第二

supplyAsync(供应商供应商,执行执行人)

返回由给定执行程序中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.

这是我的测试类:

public class TestCompleteableAndParallelStream {

    public static void main(String[] args) {
        List<MyTask> tasks = IntStream.range(0, 10)
                .mapToObj(i -> new MyTask(1))
                .collect(Collectors.toList());

        useCompletableFuture(tasks);

        useCompletableFutureWithExecutor(tasks);

    }

    public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
          long start = System.nanoTime();
          ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
                   .collect(Collectors.toList());

          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); …
Run Code Online (Sandbox Code Playgroud)

executorservice java-8 threadpoolexecutor forkjoinpool completable-future

6
推荐指数
2
解决办法
6338
查看次数

在并行流中获取安全上下文时主体为空

当我尝试从并行流中的安全上下文获取主体时,如果主体不在主线程中,它总是返回 null。

当用户通过身份验证时,以下代码段失败:

listOfSomething.parallelStream()
                .foreach(el -> { 
if (SecurityContextHolder.getContext().getAuthentication().getPrincipal() == null){
            throw new RuntimeException();
}});
Run Code Online (Sandbox Code Playgroud)

文档说:

定义与当前执行线程关联的最小安全信息的接口。

但是,有什么办法可以做到吗?它在主线程中启动并使用 ForkJoinPool

谢谢你!

java spring stream spring-security spring-boot

6
推荐指数
1
解决办法
5601
查看次数

并行文件处理:推荐的方法是什么?

这是设计和代码问题的很大结合。

用例
- 给定范围内的许多日志文件(2MB - 2GB),我需要解析每个日志并应用一些处理,生成 Java POJO.
- 对于这个问题,我们假设我们只有1日志文件
- 另外,这个想法是充分利用 System. 可以使用多个核心。

替代方案 1
- 打开文件(同步),读取每一行,生成POJOs

FileActor -> read each line -> List<POJO>  
Run Code Online (Sandbox Code Playgroud)

优点:简单易懂
缺点:串行过程,没有利用系统中的多个核心

替代方案 2
- 打开文件(同步),读取N行(N可配置),传递给不同的参与者进行处理

                                                    / LogLineProcessActor 1
FileActor -> LogLineProcessRouter (with 10 Actors) -- LogLineProcessActor 2
                                                    \ LogLineProcessActor 10
Run Code Online (Sandbox Code Playgroud)

优点一些并行化,通过使用不同的参与者来处理部分线路。参与者将利用系统中的可用核心(?如何,可能?)
缺点仍然是串行的,因为文件以串行方式读取

问题
- 以上选择是一个不错的选择吗?
- 有更好的选择吗?

请在此提供宝贵的想法

多谢

java parallel-processing file-io akka typesafe

5
推荐指数
1
解决办法
3770
查看次数