小编aka*_*okd的帖子

处理 Mono Inside Flux 平面图

我有一串弦。对于每个字符串,我必须进行远程调用。但问题是,进行远程调用的方法实际上返回了 Mono 响应(显然,因为对应于单个请求,所以将有单个响应)。

处理此类情况的正确模式应该是什么?我能想到的一种解决方案是对流元素进行串行(或并行)调用,并将响应减少为单个响应并返回。

这是代码:

fluxObj.flatmap(a -> makeRemoteCall(a)//converts the Mono of the response to a Flux).reduce(...)
Run Code Online (Sandbox Code Playgroud)

我无法将头转入 . 该flatmap方法makeRemoteCall返回一个Mono. 但返回的是响应的flatmapa 。Flux首先,为什么会发生这种情况?其次,这是否意味着返回Flux包含单个响应对象(在 中返回Mono)?

java functional-programming project-reactor

4
推荐指数
1
解决办法
9432
查看次数

share()和publish().autoConnect()之间有什么区别?

根据我的观点.我认为share()是相同的publish().autoConnect().但在这段代码中结果并不相同

Observable<Integer> cold = Observable.create(subscriber -> {
        for (int i = 0; i <= 2; i++) {
            System.out.println("Hot Observable Emit " + i);
            subscriber.onNext(i);
        }
    });

    ConnectableObservable<Integer> connectble = cold.publish().autoConnect(2);
    //Obserable(Integer) connectavle = cold.share();
    connectble.subscribe(subscriber1);
    connectble.subscribe(subscriber2);
Run Code Online (Sandbox Code Playgroud)

publish(). autoConnect() output

Hot Observable Emit 0
Subscriber 1 : 0
Subscriber 2 : 0
....
Run Code Online (Sandbox Code Playgroud)

share() output

Hot Observable Emit 0
Subscriber 1 : 0
//the subscriber2 not receive event
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我们不能share()在rx-java中使用多播?我找到了一个结论但不适合这种情况.

测试环境:oracle jdk1.8 …

rx-java

3
推荐指数
1
解决办法
1526
查看次数

主线程不等待订阅者完成反应式订阅者的任务

我在春季有一项服务,需要使用十种不同的方法来获取数据。

我想让这些方法并行执行来执行一些数据库操作并返回到父线程。但父线程应该等待所有响应到来,然后返回响应。

在我当前的方法中,我使用反应式单声道异步执行所有方法,但主线程不等待订阅者方法完成。

以下是我订阅的两种方法

private Mono<BaseResponse> getProfileDetails(long profileId){
        return new Mono<BaseResponse>() {

            @Override
            public void subscribe(Subscriber<? super BaseResponse> s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // DB Operation
                System.out.println("Inside getProfileDetails");
                s.onNext(new BaseResponse());
            }
        };
    }

private Mono<Address> getAddressDetails(long profileId){
        return new Mono<Address>() {

            @Override
            public void subscribe(Subscriber<? super Address> s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // DB Operation
                System.out.println("Inside getAddressDetails");
                s.onNext(new Address());
            }
        };
    }
Run Code Online (Sandbox Code Playgroud)

下面是我的主要方法

public BaseResponse getDetails(long …
Run Code Online (Sandbox Code Playgroud)

java spring multithreading project-reactor spring-webflux

3
推荐指数
1
解决办法
2380
查看次数

字符串中的换行符未写入文件

我正在尝试编写一个程序来处理从文件中读入的unicode字符串.我想到了两种方法 - 一种是我读取包含换行符的整个文件,执行几次正则表达式替换,然后将其写回另一个文件; 另一个我在文件中逐行读取并匹配各行并替换它们并将其写出来的地方.我无法测试第一种方法,因为字符串中的换行符不会写为文件的换行符.以下是一些示例代码:

String output = "Hello\nthere!";
BufferedWriter oFile = new BufferedWriter(new OutputStreamWriter(
    new FileOutputStream("test.txt"), "UTF-16"));

System.out.println(output);
oFile.write(output);
oFile.close();
Run Code Online (Sandbox Code Playgroud)

print语句输出

你好
!

但文件内容是

你好!

为什么我的换行不写入文件?

java string unicode file-io

2
推荐指数
1
解决办法
8839
查看次数

对Enum Singletons有处罚吗?

是否有(表演)惩罚*以任何方式与Enum Singleton模式相关联,因为它似乎比传统的单一模式或内部持有者类习惯用得少?

*惩罚,例如在不需要的情况下可串行化的成本等,或者因为很少有开发人员阅读Effective Java 2nd Edition而使用率很低?

java singleton enums

2
推荐指数
1
解决办法
2630
查看次数

了解可流动背压 rxjava2

我把这个虚拟例子放在一起,试图backpressure更好地理解:

Flowable.range(1, 100).onBackpressureDrop()
                      .subscribeOn(Schedulers.io())
                      .observeOn(AndroidSchedulers.mainThread())
                      .subscribeWith(object : DisposableSubscriber<Int>() {
                        override fun onStart() {
                          request(1)
                        }

                        override fun onComplete() {
                          Log.d(this@MainActivity::class.java.simpleName, "onComplete")
                        }

                        override fun onNext(t: Int?) {
                          Log.d(this@MainActivity::class.java.simpleName, t.toString())
                          Thread.sleep(1000)
                          request(1)
                        }

                        override fun onError(t: Throwable?) { //handle error}
                      })
Run Code Online (Sandbox Code Playgroud)

我有一个非常慢的Subscriber消耗数据的非常快的Flowable. 我正在指示 Flowable 到onBackPressureDrop(). 尽管如此,我的输出看起来像这样(从 1 到 100)

07-16 23:07:21.097 22389-22389 D: 1
07-16 23:07:22.100 22389-22389 D: 2
07-16 23:07:23.102 22389-22389 D: 3
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.104 22389-22389 D: …
Run Code Online (Sandbox Code Playgroud)

rx-java2

2
推荐指数
1
解决办法
1264
查看次数

提高在java中将查询结果写入CSV的性能

我有以下代码执行查询并将其直接写入字符串缓冲区,然后将其转储到CSV文件.我需要写大量的记录(最多一百万).这适用于一百万条记录,对于一个大约200mb的文件大约需要半小时!在我看来好像很多时间,不确定这是否是最好的.即使它包括使用其他jar/db连接工具,请向我推荐更好的方法.

....
eventNamePrepared = con.prepareStatement(gettingStats + 
    filterOptionsRowNum + filterOptions);
ResultSet rs = eventNamePrepared.executeQuery(); 
int i=0;
try{
......
FileWriter fstream = new FileWriter(realPath + 
    "performanceCollectorDumpAll.csv");
BufferedWriter out = new BufferedWriter(fstream);
StringBuffer partialCSV = new StringBuffer();


while (rs.next()) { 
  i++;
  if (current_appl_id_col_display) 
      partialCSV.append(rs.getString("current_appl_id") + ",");
  if (event_name_col_display) 
      partialCSV.append(rs.getString("event_name") + ",");
  if (generic_method_name_col_display) 
      partialCSV.append(rs.getString("generic_method_name") + ",");
  ..... // 23 more columns to be copied same way to buffer
  partialCSV.append(" \r\n");
  // Writing to file after 10000 records to prevent partialCSV 
  // from going …
Run Code Online (Sandbox Code Playgroud)

java performance

1
推荐指数
2
解决办法
6305
查看次数

如何安全地从多个线程填充字节数组?

有没有办法安全地从多个线程填充一个字节数组(例如,第一个线程填充前半部分,第二个线程使用System.arraycopy填充后半部分)而不使用Java 6或7同步数组本身?jsr166相关库只包含int数组(AtomicIntegerArray,ParallelIntegerArray).

java multithreading bytearray

1
推荐指数
1
解决办法
1780
查看次数

为什么在 flatmap 中模拟 Mono.empty() 的响应会以错误结束?

我们有一个 methodA 它返回:

return bObj.methodB()
    .flatMap(x -> cObj.methodC(x));
Run Code Online (Sandbox Code Playgroud)

methodB 返回Mono<String>,methodC 返回Mono<Void>

使用 Mockito 时,我们可以模拟 methodB,但不能模拟 methodC。

when(bObj.methodB()).thenReturn(Mono.just("x"));
when(cObj.methodC(eq("x"))).thenReturn(Mono.empty());

aObj.methodA();
Run Code Online (Sandbox Code Playgroud)

当它运行时,我们得到以下异常:

java.lang.NullPointerException: The mapper returned a null Publisher
Run Code Online (Sandbox Code Playgroud)

当运行应用程序时,这可以正常工作,但当使用 Mockito 运行时,却不能。我认为这与 flatMap 有关,但无法弄清楚。参考这个问题似乎应该有效,但事实并非如此。有任何想法吗?

java spring-boot reactive

1
推荐指数
1
解决办法
3848
查看次数

mainThread上的combineLatest()函数

我想结合2个observable并在AndroidSchedules.mainThread()上执行"combine function".我添加了.observeOn(AndroidSchedules.mainThread()),但我仍然得到"java.lang.IllegalStateException:不在主线程上".

Observable<List<Post>> animateCameraAndGetPostsByProjection = Observable.combineLatest(
            mapObservableProvider.getMapReadyObservable(),
            LocationService.getUpdatedOrLastKnownLocation(this),
            (googleMap, location) -> {
                CameraUpdate cameraUpdate = CameraUpdateFactory.newLatLngZoom(new LatLng(location.getLatitude(),location.getLongitude()),15);
                googleMap.moveCamera(cameraUpdate);
                return googleMap.getProjection().getVisibleRegion();
            })
            .flatMap(vr -> new RestService().getApi().getPostsByMapProjection(vr.farLeft.latitude,vr.farLeft.longitude,vr.nearRight.latitude,vr.nearRight.longitude))
            .observeOn(AndroidSchedulers.mainThread());
Run Code Online (Sandbox Code Playgroud)

rx-java rx-android

0
推荐指数
1
解决办法
1148
查看次数