我想通过 Vertx 中的 EventBus 同步发送多条消息。我想发送一条消息,等待它,然后再发送下一条消息。地址是一样的。如果是默认的话我该怎么做?还是必须使用executeBlocking代码?
这是我的代码。
public class EventBusSync {
private Vertx vertx = Vertx.vertx();
private static final String SERVICE_ADDRESS = "service.worker";
public void sentViaEvBus() {
String message1 = "message1";
String message2 = "message2";
String reply1 = sendCommand(SERVICE_ADDRESS,message1);
String reply2 = sendCommand(SERVICE_ADDRESS,message2);
}
private String sendCommand(String address, String command) {
String message;
vertx.eventBus().send(address,command, handler -> {
if(handler.succeeded()) {
log.info("success");
} else {
log.error("error",handler.cause());
throw new RuntimeException("ERROR");
}
message = handler.result.body();
});
return message;
}
}
Run Code Online (Sandbox Code Playgroud)
所以在这里,如果发送第一个命令并且发生了一些事情,我想中断下一个事件总线的发送。
谢谢
如标题所述,我无法在 Postgres 数据库中插入或更新记录。我刚刚开始使用 spring 和 kotlin,所以可能缺少一些非常基本的配置。提前致谢
这是我的代码库
用户存储库
@Repository
interface UserRepository : ReactiveCrudRepository<User, Int> {}
Run Code Online (Sandbox Code Playgroud)
用户模型 @Table(“user_app”)
data class User (
@Id
@Column("id")
@GeneratedValue(strategy=GenerationType.IDENTITY)
val id : Int? = null,
@Column("username")
val username : String?,
@Column("email")
val email : String?,
@Column("name")
val name : String?,
@Column("password")
val password : String?
)
Run Code Online (Sandbox Code Playgroud)
用户控制器 @Component
class UserController{
@Autowired
private lateinit var userRepository : UserRepository
fun getAllUsers() : Flux<User> = userRepository.findAll()
fun getUserById( userId: Int) : Mono<User> = userRepository.findById(userId)
fun createUser(user: User): Mono<User> …Run Code Online (Sandbox Code Playgroud) postgresql reactive-programming kotlin spring-data-r2dbc r2dbc
SpringBoot v2.5.1
有一个端点请求长时间运行的进程结果,并且它是以某种方式创建的
(为简单起见,它是Mono.fromCallable( ... long running ... ).
客户端发出请求并触发发布者执行工作,但几秒钟后客户端中止请求(即连接丢失)。并且该过程仍然继续利用资源来计算丢弃的结果。
通知 Project Reactor 事件循环有关应取消的不必要的正在进行的工作的机制是什么?
@RestController
class EndpointSpin {
@GetMapping("/spin")
Mono<Long> spin() {
AtomicLong counter = new AtomicLong(0);
Instant stopTime = Instant.now().plus(Duration.of(1, ChronoUnit.HOURS));
return Mono.fromCallable(() -> {
while (Instant.now().isBefore(stopTime)) {
counter.incrementAndGet();
if (counter.get() % 10_000_000 == 0) {
System.out.println(counter.get());
}
// of course this does not work
if (Thread.currentThread().isInterrupted()){
break;
}
}
return counter.get();
});
}
}
Run Code Online (Sandbox Code Playgroud) java reactive-programming cancellation project-reactor spring-webflux
我可以使用exchange()现已弃用的方法将 WebClient 响应转换为响应实体。
请建议达到相同结果的其他方法。下面是我的代码。
public ResponseEntity<TestClass> getTestDetails() {
ClientResponse clientResponse = webClientBuilder.build()
.get()
.uri("http://localhost:9090/test")
.headers(httpHeaders -> {
httpHeaders.add(Constants.ACCEPT, Constants.APPLICATION_JSON);
})
.exchange()
.block();
return clientResponse.toEntity(TestClass.class).block();
}
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 web WebFlux 将字符串列表转换为 Java 中字符串列表的 Mono,也许很简单,但我找不到解决方案。
这是清单
List<String> lstString = new ArrayList<>();
lstString.add("Hello");
lstString.add("How");
lstString.add("are");
lstString.add("you?");
//Expected result
Mono<List<String>> lstMonoList =lstString.stream.map()....
Run Code Online (Sandbox Code Playgroud)
我尝试使用流、平面地图和地图方法,但我经常得到流()o列表的响应
有人可以帮助我吗?,提前致谢
当上游发出 nullValue 时,我们可以使用 'Mono.defualtIfEmpty()' 或 'Mono.switchIfEmpty()' 来替换 null 值。
但switchIfEmpty()评价上游价值急切。因此我们使用 Mono.defer() 进行惰性求值。
'Mono.defualtIfEmpty()' 也是急于求值吗?像 switchIfEmpty() 一样?
如何更改 Mono.defualtIfEmpty() 来进行惰性评估?
reactive-programming project-reactor reactive spring-webflux
我有基于事件的API(Geolocator),我想转换为Rx.
问题是某些操作要求取消订阅所有事件,并且我不想将该burdon传递给Rx API的用户.
因此,用户将订阅一些可观察对象,并且当订阅事件时,它们将被发布到那些可观察对象.
最好的方法是什么?
我想到了创建用户订阅的主题,然后通过另一组可观察对象将事件发布给那些主题.
这是最好的方法吗?如果是这样,怎么样?
我刚开始使用Akka框架,我在他们的文档中看到了以下代码.有人可以解释一下这段代码吗?
是否允许将顶级类声明为静态?
我找到了一个类似问题的帖子.它表示顶级类不能在Java中声明为静态,但在此示例中,顶级顶级代码已声明为静态!我错过了什么吗?
static class MyActorC implements Creator<MyActor> {
@Override public MyActor create() {
return new MyActor("...");
}
}
Props props2 = Props.create(MyActor.class, "...");
Props props3 = Props.create(new MyActorC());
Run Code Online (Sandbox Code Playgroud) 所以..想象我有一个像这样的方法构造:
LocalDatabase:
public Observable<PoiObject> getPoiObject() {
return Observable.defer {
PoiObject object = poiDao.getPoiObject();
if(object == null) {
return Observable.empty();
}
else {
return Observable.just(object);
}
}
}
Run Code Online (Sandbox Code Playgroud)
现在,我在其他地方有另一种方法,如下所示:
服务:
public Observable<PoiObject> getPoiObject() {
return localDatabase.getPoiObject()
}
public Observable<PoiObject> getItFromWeb() {
return restService.getObject()
}
Run Code Online (Sandbox Code Playgroud)
如果我尝试将Service :: getPoiObject的调用链接到这样的Rx调用:
用例:
public Observable<SomeVM> getObject() {
return service.getPoiObject()
.switchIfEmpty(service.getItFromWeb())
}
Run Code Online (Sandbox Code Playgroud)
然后以下单元测试失败:
@Test
public void test_getObject() {
Service service = mock()
when(service.getPoiObject()).thenReturn(any());
Observable<SomeVM> observable = usecase.getObject();
verify(service).getPoiObject();
verify(service, times(0)).getItFromWeb();
}
Run Code Online (Sandbox Code Playgroud)
为什么getItFromWeb()在清楚地执行时,前一个调用不为空(从service.getPoiObject()调用返回对象).是否还有其他策略可以测试switchIfEmpty?
如何能在电视频道的问题,因为在此解释的谈话在第31分钟的中通过RX解决?
Rx中表达的问题如下:
两个电视频道(channel1和channel2)传输图像流,加上其中的流fuzz表示没有频道或白噪声.
有两个按钮可以发送事件eButton1和eButton2按下它们.
按下这些按钮应该导致各个通道被发送到屏幕.
每个按钮按下应该被投影(映射)到相应的频道,然后所有频道组合成选择流作为以流开始的fuzz流的流.最后,交换机操作员将选定的流发送给screen.
什么相当于Sodiumswitch并在RX中合并?
是否有可能用纯高阶函数解决它?即不使用闭包?我不明白这是怎么可能的.
