小编Kay*_*ayV的帖子

Java 8 Stream - 为什么filter方法没有执行?

我正在学习使用java流过滤.但过滤后的流不会打印任何内容.我认为过滤方法没有被执行.我的过滤代码如下:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        s.startsWith("b");
        System.out.println("filter: " + s);
        return true;
    });
Run Code Online (Sandbox Code Playgroud)

没有编译错误,也没有例外.有什么建议吗?

java stream filter java-8 java-stream

6
推荐指数
1
解决办法
847
查看次数

Collectors.toConcurrentMap和通过Collectors.toMap供应商选项将Map转换为ConcurrentHashMap有什么区别?

我想将a Map转换为ConcurrentHashMap通过Java 8 StreamCollector接口,我可以使用两个选项.

首先:

Map<Integer, String> mb = persons.stream()
                                 .collect(Collectors.toMap(
                                            p -> p.age, 
                                            p -> p.name, 
                                            (name1, name2) -> name1+";"+name2,
                                            ConcurrentHashMap::new));
Run Code Online (Sandbox Code Playgroud)

第二个:

Map<Integer, String> mb1 = persons.stream()
                                  .collect(Collectors.toConcurrentMap(
                                             p -> p.age, 
                                             p -> p.name));
Run Code Online (Sandbox Code Playgroud)

哪一个是更好的选择?我什么时候应该使用每个选项?

java concurrenthashmap java-8 java-stream collectors

6
推荐指数
2
解决办法
3625
查看次数

使用Java 8在目录和子目录中查找文件

如何在Java 8中实现一个算法,给定一个起始目录和一个文件名,用于搜索给定目录中的文件或嵌套不超过5个级别的任何子目录.

例如,考虑以下目录结构:

Folder 1
   Folder 2
      Folder 3
        Folder 4
            Folder 5
                Folder 6
                    nfiles.txt....
                MyFile.txt
                xfile.txt
            filesInFolder4....
        filesInFolder3...
   .....
Run Code Online (Sandbox Code Playgroud)

该算法应搜索文件夹中包含文件的文件,并报告是否存在具有给定文件名的文件?

如何使用Java 8做到这一点?

java filesystems directory nio java-8

6
推荐指数
1
解决办法
8469
查看次数

可分性和有状态对流并行处理的影响

我正在使用Stream并行处理,并了解如果我使用平面阵列流,它会得到非常快速的处理.但如果我使用ArrayList,那么处理速度会慢一些.但是,如果我使用LinkedList或使用一些二进制树,处理速度会更慢.

所有听起来更像是流的可分割性,处理速度越快.这意味着阵列和数组列表在并行流的情况下最有效.这是真的吗?如果是这样,ArrayList如果我们想并行处理流,我们总是使用或者Array吗?如果是这样,如何使用LinkedListBlockingQueue并行流?

另一件事是选择的中间函数的状态.如果我执行像无状态操作filter(),map(),性能高,但如果执行像国家提供充分的操作distinct(),sorted(),limit(),skip(),它需要大量的时间.再次,并行流变慢.这是否意味着我们不应该在并行流中使用状态全中间函数?如果是这样,那么解决这个问题的方法是什么?

parallel-processing java-8 java-stream

6
推荐指数
2
解决办法
148
查看次数

默认的ForkJoinPool执行器需要很长时间

我正在使用CompletableFuture来异步执行从列表源生成的流.

所以我正在测试重载方法,即CompletableFuture的"supplyAsync",其中一个方法只接受单个供应商参数,另一个方法接受供应商参数和执行者参数.以下是两者的文档:

supplyAsync(供应商供应商)

返回由ForkJoinPool.commonPool()中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.

第二

supplyAsync(供应商供应商,执行执行人)

返回由给定执行程序中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.

这是我的测试类:

public class TestCompleteableAndParallelStream {

    public static void main(String[] args) {
        List<MyTask> tasks = IntStream.range(0, 10)
                .mapToObj(i -> new MyTask(1))
                .collect(Collectors.toList());

        useCompletableFuture(tasks);

        useCompletableFutureWithExecutor(tasks);

    }

    public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
          long start = System.nanoTime();
          ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
                   .collect(Collectors.toList());

          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); …
Run Code Online (Sandbox Code Playgroud)

executorservice java-8 threadpoolexecutor forkjoinpool completable-future

6
推荐指数
2
解决办法
6338
查看次数

ForkJoinPool - 为什么程序抛出OutOfMemoryError?

我想在Java 8中试用ForkJoinPool,所以我编写了一个小程序,用于搜索名称中包含给定目录中特定关键字的所有文件.

计划:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        List<String> files = pool.invoke(task);
        pool.shutdown();
        System.out.println("Total  no of files with hello" + files.size());
    }

}

    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private String path;
        public FileSearchRecursiveTask(String path) {
            this.path = path;
        }

        @Override
        protected List<String> compute() {
            File mainDirectory = new File(path);
            List<String> filetedFileList = new ArrayList<>();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            if(mainDirectory.isDirectory()) {
                System.out.println(Thread.currentThread() + " …
Run Code Online (Sandbox Code Playgroud)

java multithreading fork-join java-8 forkjoinpool

6
推荐指数
1
解决办法
846
查看次数

如何在nodejs中创建带有分区的kafka主题?

我正在使用 kafka-node link api 来创建 kafka 主题。我没有找到如何使用分区创建 kafka 主题。

var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    client = new kafka.Client(),
    producer = new Producer(client);
// Create topics sync
producer.createTopics(['t','t1'], false, function (err, data) {
    console.log(data);
});
// Create topics async
producer.createTopics(['t'], true, function (err, data) {});

producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
Run Code Online (Sandbox Code Playgroud)

如何在 nodejs 中创建带有分区的 kafka 主题。

node.js apache-kafka

5
推荐指数
1
解决办法
4010
查看次数

Schedulers.newElastic 和 Schedulers.elastic 方法之间有什么区别?

我正在研究 Flux 和 Mono,并在多线程环境中使用它们,并使用提供工作线程的 Schedular。

使用elastic、parallel 和newElastic 启动Schedular 有很多选项。

这是我使用的代码:

    System.out.println("------ elastic ---------  ");
    Flux.range(1, 10)
      .map(i -> i / 2)
      .publishOn(Schedulers.elastic()).log()
      .blockLast();
    
    System.out.println("------ new elastic ---------  ");
    Flux.range(1, 10)
      .map(i -> i / 2).log()
      .publishOn(Schedulers.newElastic("my")).log()
      .blockLast();
Run Code Online (Sandbox Code Playgroud)

并且他们都有相同的文档:

调度程序动态创建基于 ExecutorService 的 Workers 并缓存线程池,并在 Workers 关闭后重用它们。

创建线程池的最大数量没有限制。

未使用的线程池的默认生存时间为 60 秒,使用适当的工厂推送不同的值。

该调度程序不可重新启动。

这是他们两个的日志:

------ elastic ---------  
[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (elastic-2) | onNext(0)
[ INFO] (elastic-2) | onNext(1)
[ INFO] (elastic-2) | onNext(1) …
Run Code Online (Sandbox Code Playgroud)

java multithreading scheduler project-reactor reactive-streams

5
推荐指数
1
解决办法
7508
查看次数

何时使用 countByValue 何时使用 map().reduceByKey()

我是 Spark 和 Scala 的新手,正在研究一个简单的 wordCount 示例。

因此,我使用 countByValue 如下:

val words = lines.flatMap(x => x.split("\\W+")).map(x => x.toLowerCase())
val wordCount = words.countByValue();
Run Code Online (Sandbox Code Playgroud)

这工作正常。

同样的事情可以实现:

val words = lines.flatMap(x => x.split("\\W+")).map(x => x.toLowerCase())
val wordCounts = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
val sortedWords = wordCounts.map(x => (x._2, x._1)).sortByKey()
Run Code Online (Sandbox Code Playgroud)

这也很好用。

现在,我的问题是什么时候使用哪些方法?哪一个比另一个更受欢迎?

scala word-count apache-spark rdd

5
推荐指数
3
解决办法
5627
查看次数

Janusgraph:无法实例化实现:org.janusgraph.diskstorage.cql.CQLStoreManager

我有一个应用程序,它使用嵌入式 janusgraph 和 cassandra 作为后端数据库。

以前,我使用 cassandrathrift 进行连接,并且工作正常。以下是旧配置:

storage.backend=cassandrathrift
storage.cassandra.keyspace=t_graph
Run Code Online (Sandbox Code Playgroud)

但我在超时方面遇到了一些问题。因此,我将配置从 cassandrathrift 更改为 cql。这是新配置:

storage.backend=cql
storage.cql.keyspace=t_graph
storage.cql.read-consistency-level=ONE
Run Code Online (Sandbox Code Playgroud)

而且,现在我收到以下错误:

> Caused by: org.springframework.beans.factory.BeanCreationException:
> Could not autowire field: private in.graph.services.GraphService
> in.graph.services.FollowService.graphService; nested exception is
> org.springframework.beans.factory.BeanCreationException: Error
> creating bean with name 'graphService': Invocation of init method
> failed; nested exception is java.lang.IllegalArgumentException: Could
> not instantiate implementation:
> org.janusgraph.diskstorage.cql.CQLStoreManager    at
> org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:573)
>   at
> org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:88)
>   at
> org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:331)
>   ... 28 common frames omitted Caused by:
> …
Run Code Online (Sandbox Code Playgroud)

graph cql cassandra janusgraph

5
推荐指数
0
解决办法
1525
查看次数