我正在学习使用java流过滤.但过滤后的流不会打印任何内容.我认为过滤方法没有被执行.我的过滤代码如下:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
s.startsWith("b");
System.out.println("filter: " + s);
return true;
});
Run Code Online (Sandbox Code Playgroud)
没有编译错误,也没有例外.有什么建议吗?
我想将a Map转换为ConcurrentHashMap通过Java 8 Stream和Collector接口,我可以使用两个选项.
首先:
Map<Integer, String> mb = persons.stream()
.collect(Collectors.toMap(
p -> p.age,
p -> p.name,
(name1, name2) -> name1+";"+name2,
ConcurrentHashMap::new));
Run Code Online (Sandbox Code Playgroud)
第二个:
Map<Integer, String> mb1 = persons.stream()
.collect(Collectors.toConcurrentMap(
p -> p.age,
p -> p.name));
Run Code Online (Sandbox Code Playgroud)
哪一个是更好的选择?我什么时候应该使用每个选项?
如何在Java 8中实现一个算法,给定一个起始目录和一个文件名,用于搜索给定目录中的文件或嵌套不超过5个级别的任何子目录.
例如,考虑以下目录结构:
Folder 1
Folder 2
Folder 3
Folder 4
Folder 5
Folder 6
nfiles.txt....
MyFile.txt
xfile.txt
filesInFolder4....
filesInFolder3...
.....
Run Code Online (Sandbox Code Playgroud)
该算法应搜索文件夹中包含文件的文件,并报告是否存在具有给定文件名的文件?
如何使用Java 8做到这一点?
我正在使用Stream并行处理,并了解如果我使用平面阵列流,它会得到非常快速的处理.但如果我使用ArrayList,那么处理速度会慢一些.但是,如果我使用LinkedList或使用一些二进制树,处理速度会更慢.
所有听起来更像是流的可分割性,处理速度越快.这意味着阵列和数组列表在并行流的情况下最有效.这是真的吗?如果是这样,ArrayList如果我们想并行处理流,我们总是使用或者Array吗?如果是这样,如何使用LinkedList和BlockingQueue并行流?
另一件事是选择的中间函数的状态.如果我执行像无状态操作filter(),map(),性能高,但如果执行像国家提供充分的操作distinct(),sorted(),limit(),skip(),它需要大量的时间.再次,并行流变慢.这是否意味着我们不应该在并行流中使用状态全中间函数?如果是这样,那么解决这个问题的方法是什么?
我正在使用CompletableFuture来异步执行从列表源生成的流.
所以我正在测试重载方法,即CompletableFuture的"supplyAsync",其中一个方法只接受单个供应商参数,另一个方法接受供应商参数和执行者参数.以下是两者的文档:
一
supplyAsync(供应商供应商)
返回由ForkJoinPool.commonPool()中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.
第二
supplyAsync(供应商供应商,执行执行人)
返回由给定执行程序中运行的任务异步完成的新CompletableFuture,其中包含通过调用给定供应商获得的值.
这是我的测试类:
public class TestCompleteableAndParallelStream {
public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());
useCompletableFuture(tasks);
useCompletableFutureWithExecutor(tasks);
}
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); …Run Code Online (Sandbox Code Playgroud) executorservice java-8 threadpoolexecutor forkjoinpool completable-future
我想在Java 8中试用ForkJoinPool,所以我编写了一个小程序,用于搜索名称中包含给定目录中特定关键字的所有文件.
计划:
public class DirectoryService {
public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
List<String> files = pool.invoke(task);
pool.shutdown();
System.out.println("Total no of files with hello" + files.size());
}
}
class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private String path;
public FileSearchRecursiveTask(String path) {
this.path = path;
}
@Override
protected List<String> compute() {
File mainDirectory = new File(path);
List<String> filetedFileList = new ArrayList<>();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
if(mainDirectory.isDirectory()) {
System.out.println(Thread.currentThread() + " …Run Code Online (Sandbox Code Playgroud) 我正在使用 kafka-node link api 来创建 kafka 主题。我没有找到如何使用分区创建 kafka 主题。
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
// Create topics sync
producer.createTopics(['t','t1'], false, function (err, data) {
console.log(data);
});
// Create topics async
producer.createTopics(['t'], true, function (err, data) {});
producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
Run Code Online (Sandbox Code Playgroud)
如何在 nodejs 中创建带有分区的 kafka 主题。
我正在研究 Flux 和 Mono,并在多线程环境中使用它们,并使用提供工作线程的 Schedular。
使用elastic、parallel 和newElastic 启动Schedular 有很多选项。
这是我使用的代码:
System.out.println("------ elastic --------- ");
Flux.range(1, 10)
.map(i -> i / 2)
.publishOn(Schedulers.elastic()).log()
.blockLast();
System.out.println("------ new elastic --------- ");
Flux.range(1, 10)
.map(i -> i / 2).log()
.publishOn(Schedulers.newElastic("my")).log()
.blockLast();
Run Code Online (Sandbox Code Playgroud)
并且他们都有相同的文档:
调度程序动态创建基于 ExecutorService 的 Workers 并缓存线程池,并在 Workers 关闭后重用它们。
创建线程池的最大数量没有限制。
未使用的线程池的默认生存时间为 60 秒,使用适当的工厂推送不同的值。
该调度程序不可重新启动。
这是他们两个的日志:
------ elastic ---------
[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (elastic-2) | onNext(0)
[ INFO] (elastic-2) | onNext(1)
[ INFO] (elastic-2) | onNext(1) …Run Code Online (Sandbox Code Playgroud) java multithreading scheduler project-reactor reactive-streams
我是 Spark 和 Scala 的新手,正在研究一个简单的 wordCount 示例。
因此,我使用 countByValue 如下:
val words = lines.flatMap(x => x.split("\\W+")).map(x => x.toLowerCase())
val wordCount = words.countByValue();
Run Code Online (Sandbox Code Playgroud)
这工作正常。
同样的事情可以实现:
val words = lines.flatMap(x => x.split("\\W+")).map(x => x.toLowerCase())
val wordCounts = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
val sortedWords = wordCounts.map(x => (x._2, x._1)).sortByKey()
Run Code Online (Sandbox Code Playgroud)
这也很好用。
现在,我的问题是什么时候使用哪些方法?哪一个比另一个更受欢迎?
我有一个应用程序,它使用嵌入式 janusgraph 和 cassandra 作为后端数据库。
以前,我使用 cassandrathrift 进行连接,并且工作正常。以下是旧配置:
storage.backend=cassandrathrift
storage.cassandra.keyspace=t_graph
Run Code Online (Sandbox Code Playgroud)
但我在超时方面遇到了一些问题。因此,我将配置从 cassandrathrift 更改为 cql。这是新配置:
storage.backend=cql
storage.cql.keyspace=t_graph
storage.cql.read-consistency-level=ONE
Run Code Online (Sandbox Code Playgroud)
而且,现在我收到以下错误:
> Caused by: org.springframework.beans.factory.BeanCreationException:
> Could not autowire field: private in.graph.services.GraphService
> in.graph.services.FollowService.graphService; nested exception is
> org.springframework.beans.factory.BeanCreationException: Error
> creating bean with name 'graphService': Invocation of init method
> failed; nested exception is java.lang.IllegalArgumentException: Could
> not instantiate implementation:
> org.janusgraph.diskstorage.cql.CQLStoreManager at
> org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:573)
> at
> org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:88)
> at
> org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:331)
> ... 28 common frames omitted Caused by:
> …Run Code Online (Sandbox Code Playgroud) java-8 ×6
java ×5
java-stream ×3
forkjoinpool ×2
apache-kafka ×1
apache-spark ×1
cassandra ×1
collectors ×1
cql ×1
directory ×1
filesystems ×1
filter ×1
fork-join ×1
graph ×1
janusgraph ×1
nio ×1
node.js ×1
rdd ×1
scala ×1
scheduler ×1
stream ×1
word-count ×1