标签: reactive-programming

Vertx 中同步发送 EventBus 消息

我想通过 Vertx 中的 EventBus 同步发送多条消息。我想发送一条消息,等待它,然后再发送下一条消息。地址是一样的。如果是默认的话我该怎么做?还是必须使用executeBlocking代码?

这是我的代码。

public class EventBusSync {
    private Vertx vertx = Vertx.vertx();
    private static final String SERVICE_ADDRESS =  "service.worker";
    
  public void sentViaEvBus() {
    String message1 = "message1";
    String message2 = "message2";
    
    String reply1 = sendCommand(SERVICE_ADDRESS,message1);
    String reply2 = sendCommand(SERVICE_ADDRESS,message2);

  }

  private String sendCommand(String address, String command) {
   String message;
   vertx.eventBus().send(address,command, handler -> {
    if(handler.succeeded()) {
     log.info("success");
   } else {
     log.error("error",handler.cause());
     throw new RuntimeException("ERROR");
    }
    message = handler.result.body();
    });
 return message;
  }
 }
Run Code Online (Sandbox Code Playgroud)

所以在这里,如果发送第一个命令并且发生了一些事情,我想中断下一个事件总线的发送。

谢谢

java reactive-programming event-bus vert.x rx-java

0
推荐指数
1
解决办法
2852
查看次数

保存在 ReactiveCrudRepository 中不插入或更新记录

如标题所述,我无法在 Postgres 数据库中插入或更新记录。我刚刚开始使用 spring 和 kotlin,所以可能缺少一些非常基本的配置。提前致谢

这是我的代码库

用户存储库

@Repository
interface UserRepository : ReactiveCrudRepository<User, Int> {}
Run Code Online (Sandbox Code Playgroud)

用户模型 @Table(“user_app”)

data class User (
    @Id
    @Column("id")
    @GeneratedValue(strategy=GenerationType.IDENTITY)
    val id : Int? =  null,

    @Column("username")
    val username : String?,

    @Column("email")
    val email : String?,

    @Column("name")
    val name : String?,

    @Column("password")
    val password : String?
)
Run Code Online (Sandbox Code Playgroud)

用户控制器 @Component

class UserController{

    @Autowired
    private lateinit var userRepository : UserRepository

    fun getAllUsers() : Flux<User> = userRepository.findAll()

    fun getUserById( userId: Int) : Mono<User> = userRepository.findById(userId)

    fun createUser(user: User): Mono<User> …
Run Code Online (Sandbox Code Playgroud)

postgresql reactive-programming kotlin spring-data-r2dbc r2dbc

0
推荐指数
1
解决办法
7763
查看次数

当客户端中止请求时,WebFlux 如何停止发布者?

SpringBoot v2.5.1

有一个端点请求长时间运行的进程结果,并且它是以某种方式创建的
(为简单起见,它是Mono.fromCallable( ... long running ... ).

客户端发出请求并触发发布者执行工作,但几秒钟后客户端中止请求(即连接丢失)。并且该过程仍然继续利用资源来计算丢弃的结果。

通知 Project Reactor 事件循环有关应取消的不必要的正在进行的工作的机制是什么?

@RestController 
class EndpointSpin {
 
  @GetMapping("/spin")
  Mono<Long> spin() {
    AtomicLong counter = new AtomicLong(0);
    Instant stopTime = Instant.now().plus(Duration.of(1, ChronoUnit.HOURS));

    return Mono.fromCallable(() -> {

      while (Instant.now().isBefore(stopTime)) {
        counter.incrementAndGet();

        if (counter.get() % 10_000_000 == 0) {
          System.out.println(counter.get());
        }

        // of course this does not work
        if (Thread.currentThread().isInterrupted()){
           break;
        }
      }

      return counter.get();
    });
  }
}
Run Code Online (Sandbox Code Playgroud)

java reactive-programming cancellation project-reactor spring-webflux

0
推荐指数
1
解决办法
1388
查看次数

如何将 Web 客户端响应转换为 ResponseEntity?

我可以使用exchange()现已弃用的方法将 WebClient 响应转换为响应实体。

请建议达到相同结果的其他方法。下面是我的代码。

public ResponseEntity<TestClass> getTestDetails() {
            ClientResponse clientResponse = webClientBuilder.build()
                    .get()
                    .uri("http://localhost:9090/test")
                    .headers(httpHeaders -> {
                        httpHeaders.add(Constants.ACCEPT, Constants.APPLICATION_JSON);
                        })
                    .exchange()
                    .block();
            
            return clientResponse.toEntity(TestClass.class).block();
}
Run Code Online (Sandbox Code Playgroud)

spring reactive-programming spring-boot spring-webflux

0
推荐指数
1
解决办法
5123
查看次数

将 List&lt;String&gt; 转换为 Mono&lt;List&lt;String&lt;&gt;&gt; Java WebFlux

我正在尝试使用 web WebFlux 将字符串列表转换为 Java 中字符串列表的 Mono,也许很简单,但我找不到解决方案。

这是清单

List<String> lstString = new ArrayList<>();
lstString.add("Hello");
lstString.add("How");
lstString.add("are");
lstString.add("you?");


//Expected result
Mono<List<String>> lstMonoList =lstString.stream.map()....
Run Code Online (Sandbox Code Playgroud)

我尝试使用流、平面地图和地图方法,但我经常得到流()o列表的响应

有人可以帮助我吗?,提前致谢

java reactive-programming spring-webflux

0
推荐指数
1
解决办法
3981
查看次数

Mono.defualtEmpty() 与 Mono.switchIfEmpty()

当上游发出 nullValue 时,我们可以使用 'Mono.defualtIfEmpty()' 或 'Mono.switchIfEmpty()' 来替换 null 值。

switchIfEmpty()评价上游价值急切。因此我们使用 Mono.defer() 进行惰性求值。

  1. 'Mono.defualtIfEmpty()' 也是急于求值吗?像 switchIfEmpty() 一样?

  2. 如何更改 Mono.defualtIfEmpty() 来进行惰性评估?

reactive-programming project-reactor reactive spring-webflux

0
推荐指数
1
解决办法
4765
查看次数

订阅未来的可观察性

我有基于事件的API(Geolocator),我想转换为Rx.

问题是某些操作要求取消订阅所有事件,并且我不想将该burdon传递给Rx API的用户.

因此,用户将订阅一些可观察对象,并且当订阅事件时,它们将被发布到那些可观察对象.

最好的方法是什么?

我想到了创建用户订阅的主题,然后通过另一组可观察对象将事件发布给那些主题.

这是最好的方法吗?如果是这样,怎么样?

reactive-programming system.reactive

-2
推荐指数
1
解决办法
297
查看次数

是否允许在Java中将顶级类声明为静态

我刚开始使用Akka框架,我在他们的文档中看到了以下代码.有人可以解释一下这段代码吗?

是否允许将顶级类声明为静态?

我找到了一个类似问题的帖子.它表示顶级类不能在Java中声明为静态,但在此示例中,顶级顶级代码已声明为静态!我错过了什么吗?

static class MyActorC implements Creator<MyActor> {
    @Override public MyActor create() {
        return new MyActor("...");
    }
}

Props props2 = Props.create(MyActor.class, "...");
Props props3 = Props.create(new MyActorC());
Run Code Online (Sandbox Code Playgroud)

java reactive-programming akka

-2
推荐指数
1
解决办法
94
查看次数

RxJava2 switchIfEmpty并验证执行

所以..想象我有一个像这样的方法构造:

LocalDatabase:

public Observable<PoiObject> getPoiObject() {

    return Observable.defer {
        PoiObject object = poiDao.getPoiObject();
        if(object == null) {
            return Observable.empty();
        }
        else {
            return Observable.just(object);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,我在其他地方有另一种方法,如下所示:

服务:

public Observable<PoiObject> getPoiObject() {
    return localDatabase.getPoiObject()
}

public Observable<PoiObject> getItFromWeb() {
    return restService.getObject()
}
Run Code Online (Sandbox Code Playgroud)

如果我尝试将Service :: getPoiObject的调用链接到这样的Rx调用:

用例:

public Observable<SomeVM> getObject() {
    return service.getPoiObject()
        .switchIfEmpty(service.getItFromWeb())
}
Run Code Online (Sandbox Code Playgroud)

然后以下单元测试失败:

@Test
public void test_getObject() {
    Service service = mock()

    when(service.getPoiObject()).thenReturn(any());

    Observable<SomeVM> observable = usecase.getObject();
    verify(service).getPoiObject();
    verify(service, times(0)).getItFromWeb();
}
Run Code Online (Sandbox Code Playgroud)

为什么getItFromWeb()在清楚地执行时,前一个调用不为空(从service.getPoiObject()调用返回对象).是否还有其他策略可以测试switchIfEmpty?

android reactive-programming kotlin reactive rx-java2

-2
推荐指数
1
解决办法
1350
查看次数

在RX中切换流:Sodium相当于RX中的合并和切换

如何能在电视频道的问题,因为在此解释的谈话在第31分钟的中通过RX解决?

Rx中表达的问题如下:

两个电视频道(channel1channel2)传输图像流,加上其中的流fuzz表示没有频道或白噪声.

有两个按钮可以发送事件eButton1eButton2按下它们.

按下这些按钮应该导致各个通道被发送到屏幕.

每个按钮按下应该被投影(映射)到相应的频道,然后所有频道组合成选择流作为以流开始的fuzz流的流.最后,交换机操作员将选定的流发送给screen.

什么相当于Sodiumswitch并在RX中合并?

是否有可能用纯高阶函数解决它?即不使用闭包?我不明白这是怎么可能的.

在此输入图像描述

c# reactive-programming system.reactive sodium

-3
推荐指数
1
解决办法
1257
查看次数