小编Kay*_*ayV的帖子

Flux.concat 和 Flux.concatWith 之间的区别

我是反应流新手,正在学习使用 concat/concatWith 方法组合两个发布者(具体来说是 Flux)。

我可以用 concat 方法做的所有事情,都可以使用 concatWith 方法来实现。这是我使用的示例案例:

        Mono<String> mono1 = Mono.just(" karan ");
        Mono<String> mono2 = Mono.just(" | verma ");
        Mono<String> mono3 = Mono.just(" | kv ");

        Flux<String> flux1 = Flux.just(" {1} ","{2} ","{3} ","{4} " );
        Flux<String> flux2 = Flux.just(" |A|"," |B| "," |C| ");

        // FLux emits item each 500ms
        Flux<String> intervalFlux1 = Flux.interval(Duration.ofMillis(1000))
                                        .zipWith(flux1, (i, string) -> string);

        // FLux emits item each 700ms       
        Flux<String> intervalFlux2 = Flux
                                .interval(Duration.ofMillis(1000))
                                .zipWith(flux2, (i, string) -> string);



        System.out.println("**************Flux …
Run Code Online (Sandbox Code Playgroud)

reactive-programming project-reactor reactive-streams

3
推荐指数
1
解决办法
8510
查看次数

Java 8 Stream - 使用Custom Collector时出现NullPointerException

我通过实现Collector接口并覆盖其方法来实现自定义收集器.Collector实现如下:

class MyCustomCollector implements Collector<Person, StringJoiner, String>{

    @Override
    public Supplier<StringJoiner> supplier() {
        // TODO Auto-generated method stub
        return () -> new StringJoiner("|");
    }

    @Override
    public BiConsumer<StringJoiner, Person> accumulator() {
        // TODO Auto-generated method stub
        return (joiner,person) -> joiner.add(person.name.toUpperCase());
    }

    @Override
    public BinaryOperator<StringJoiner> combiner() {
        // TODO Auto-generated method stub
        return (joiner1, joiner2) -> joiner1.merge(joiner2);
    }

    @Override
    public Function<StringJoiner, String> finisher() {
        // TODO Auto-generated method stub
        return StringJoiner::toString;
    }

    @Override
    public Set<java.util.stream.Collector.Characteristics> characteristics() {
        // TODO Auto-generated method stub
        return …
Run Code Online (Sandbox Code Playgroud)

java-8 java-stream collectors

2
推荐指数
1
解决办法
1047
查看次数

如何使用Optional来检查java 8中的空值?

我有一个包含嵌套和内部类的类.在某些程序中,我使用遗留java方式中的相等运算符检查对象的空值.以下是我的代码片段:

class Outer {
    Nested nested;
    Nested getNested() {
        return nested;
    }
}
class Nested {
    Inner inner;
    Inner getInner() {
        return inner;
    }
}
class Inner {
    String foo;
    String getFoo() {
        return foo;
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我如何对引用变量进行空检查:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}
Run Code Online (Sandbox Code Playgroud)

我知道这也可以使用java 8的Optional类来完成,但是我没有办法在上面的特定场景中做同样的事情.有什么建议吗?

java optional java-8 java-stream

2
推荐指数
2
解决办法
3963
查看次数

ReentrantLock 与 stampedlock 有什么区别?更喜欢哪一个?

在 ReentrantLock 和 StampedLock 之间进行选择的用例应该是什么?例如,如果我有 10 个读者和 10 个写者,应该选择哪种锁?如果我有 20 位读者和 1 位作者,该选择哪一个?

java concurrency locking reentrantlock java-8

2
推荐指数
1
解决办法
1465
查看次数

collectAndThen方法是否足够有效?

我最近开始使用collectAndThen,发现与其他编码程序相比,它需要花费相当长的时间,我用它来执行类似的任务.

这是我的代码:

        System.out.println("CollectingAndThen");
        Long t = System.currentTimeMillis();
        String personWithMaxAge = persons.stream()
                                        .collect(Collectors.collectingAndThen(
                                                                Collectors.maxBy(Comparator.comparing(Person::getAge)),
                                                                (Optional<Person> p) -> p.isPresent() ? p.get().getName() : "none"
                                                ));


        System.out.println("personWithMaxAge - "+personWithMaxAge + " time taken = "+(System.currentTimeMillis() - t));
        Long t2 = System.currentTimeMillis();
        String personWithMaxAge2 = persons.stream().sorted(Comparator.comparing(Person::getAge).reversed())
                                                    .findFirst().get().name;
        System.out.println("personWithMaxAge2 : "+personWithMaxAge2+ " time taken = "+(System.currentTimeMillis() - t2));
Run Code Online (Sandbox Code Playgroud)

在这里输出:

CollectingAndThen
personWithMaxAge - Peter time taken = 17
personWithMaxAge2 : Peter time taken = 1
Run Code Online (Sandbox Code Playgroud)

这表明收集和收集时间相对较长.

所以我的问题是 - 我应该继续收集和其他建议吗?

java collect java-8 java-stream collectors

2
推荐指数
3
解决办法
1894
查看次数

如何在 Spring5 中使用 BodyInserters 插入请求正文?

我正在使用 Sping webflux 模块并创建一个 WebClient,请求 uri 和请求正文如下:

// create webclient
WebClient wc3 = WebClient.builder()
                            .baseUrl("http://localhost:8080")
                            .defaultCookie("key", "val")
                            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                            .build();
// set uri
WebClient.RequestBodySpec uri1 = wc3.method(HttpMethod.POST).uri("/getDocs");


//  set a request body
WebClient.RequestBodySpec requestSpec1 = WebClient.create().method(HttpMethod.POST).uri("/getDocs")
                                                .body(BodyInserters.fromPublisher(Mono.just("data")), String.class);
Run Code Online (Sandbox Code Playgroud)

当我设置请求正文时,我收到以下编译错误:

Multiple markers at this line
    - Type mismatch: cannot convert from Mono<String> to P
    - The method fromPublisher(P, Class<T>) in the type BodyInserters is not applicable for the arguments 
     (Mono<String>)
Run Code Online (Sandbox Code Playgroud)

Java 编辑器仅显示“在文件中重命名”作为建议。

我不确定我是否完美地使用了 BodyInserters。请建议。

spring spring-boot

2
推荐指数
1
解决办法
1万
查看次数

如何通过将两个 Flux 中的值配对到一个元组中来组合发布者?

假设我有两个 Flux,如下所示:

    Flux<Integer> f1 = Flux.just(10,20,30,40);
    Flux<Integer> f2 = Flux.just(100,200,300,400);
Run Code Online (Sandbox Code Playgroud)

现在我想要的是将这些通量组合成单个通量或两个通量的元组,这将在单个通量中具有两个通量的元素。

我使用 zipwith 方法尝试了以下操作:

  Flux<Integer, Integer> zipped = f1.zipWith(f2,
                                            (one, two) ->  one + "," +two)
                                    .subscribe();
Run Code Online (Sandbox Code Playgroud)

但这会产生编译时错误:

 Incorrect number of arguments for type Flux<T>; it cannot be parameterized with arguments <Integer, Integer>
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?请建议。

java publisher publish-subscribe project-reactor reactive-streams

2
推荐指数
1
解决办法
2965
查看次数

Java 8 Stream - Reduce函数的组合器没有被执行

我使用一个简单的reduce方法,有三个参数即.身份,累加器和组合器.这是我的代码......

Integer ageSumComb = persons
            .stream()
            .reduce(0,
                (sum, p) -> {
                    System.out.println("Accumulator: Sum= "+ sum + " Person= " + p);
                    return sum += p.age;
                },
                (sum1, sum2) -> {
                    System.out.format("Combiner: Sum1= " + sum1 + " Sum2= "+ sum2);
                    return sum1 + sum2;
Run Code Online (Sandbox Code Playgroud)

但正在发生的事情是Combiner没有被执行.我没有得到这背后的原因.这是我的输出..

Accumulator: Sum= 0 Person= Max
Accumulator: Sum= 18 Person= Peter
Accumulator: Sum= 41 Person= Pamela
Accumulator: Sum= 64 Person= David
Accumulator: Sum= 76 Person= Pam
Run Code Online (Sandbox Code Playgroud)

但是,没有编译错误,没有异常,我的输出完全正确,与我的预期相同.但是没有得到为什么组合器没有执行.

java reduce java-8 combiners java-stream

1
推荐指数
1
解决办法
316
查看次数

Flux.compose和Flux.transform之间的区别?

我正在学习反应流,并在Publishers(Flux)上工作,并致力于Flux的转型。为此,我得到了撰写和转换方法。

这是我的代码:

private static void composeStream() {
    System.out.println("*********Calling composeStream************");
    Function<Flux<String>, Flux<String>> alterMap = f -> {
                                                                return f.filter(color -> !color.equals("ram"))
                                                                        .map(String::toUpperCase);
                                                            };

    Flux<String> compose = Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
                                    .doOnNext(System.out::println)
                                    .compose(alterMap);

    compose.subscribe(d -> System.out.println("Subscriber to Composed AlterMap :"+d));
    System.out.println("-------------------------------------");

}

private static void transformStream() {
    System.out.println("*********Calling transformStream************");
    Function<Flux<String>, Flux<String>> alterMap = f -> f.filter(color -> !color.equals("ram"))
                                                            .map(String::toUpperCase);

    Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
            .doOnNext(System.out::println)
            .transform(alterMap)
            .subscribe(d -> System.out.println("Subscriber to Transformed AlterMap: "+d));
    System.out.println("-------------------------------------");
}
Run Code Online (Sandbox Code Playgroud)

这是输出,这两种情况都相同:

*********Calling transformStream************
ram
sam
Subscriber to Transformed …
Run Code Online (Sandbox Code Playgroud)

publisher publish-subscribe reactive-programming project-reactor reactive-streams

1
推荐指数
1
解决办法
774
查看次数

kafka 主题中理想的分区数是多少?

我正在学习 Kafka 并尝试为我最近的搜索应用程序创建一个主题。推送到 kafka 主题的数据被认为是一个很大的数字。

我的 kafka 集群有 3 个代理,并且已经为其他需求创建了主题。

现在我应该为最近的搜索主题选择多少个分区?如果我没有明确提供分区号怎么办?选择分区号需要考虑哪些事项?

apache-kafka kafka-consumer-api kafka-producer-api kafka-topic

0
推荐指数
1
解决办法
2340
查看次数

printf 在 scala 中如何工作?

我正在学习 Scala 和 Spark,想要打印一些格式化的日志。

这是我得到的一些例子:

val flag : Boolean = true 
val charA : Char = 'a' 
val piVal : Float = 3.14159265f
val num : Int = 1 

println(f"val of pi = $piVal%.3f")

println(f"another formatting : $num%05d")

println(s"values like $num $flag $charA")

println(s"evaluate expression = ${1+2}")
Run Code Online (Sandbox Code Playgroud)

但我不明白这一切的意义。

请建议何时使用 f 以及何时将 s 与 printf 一起使用以及使用它的格式是什么?

formatting scala

-1
推荐指数
1
解决办法
4974
查看次数