我有一个这样的组合发布者:
enum RemoteError: Error {
case networkError(Error)
case parseError(Error)
case emptyResponse
}
func getPublisher(url: URL) -> AnyPublisher<Entiy, RemoteError> {
return URLSession.shared
.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: RemoteResponse.self, decoder: decoder)
.mapError { error -> RemoteError in
switch error {
case is URLError:
return .networkError(error)
default:
return .parseError(error)
}
}
.map { response -> Entiy in
response.enitities.last
}
.eraseToAnyPublisher()
}
struct RemoteResponse: Codable {
let enitities: [Entity]
let numberOfEntries: Int
}
struct Entity {
}
Run Code Online (Sandbox Code Playgroud)
通过上述设置,编译器会抱怨,因为response.enitities.last可能为零。问题是我可以用空发布者替换 nil 吗?如果不能,我可以用emptyResponse合并链中的错误替换它吗?第一个选项是更可取的。
有人可以帮助我理解上下文是如何在反应流中传递的。例如,请参见下面的代码:
Flux<Integer> expectedFluxWithContext = Flux.just(1, 2, 3, 4)
.flatMap(item -> Mono.just(item).contextWrite(Context.of("traceId", item)))
.doOnEach(signal -> System.out.println(signal.getContextView()));
Run Code Online (Sandbox Code Playgroud)
当我运行上面的代码时,我得到了我所期望的正确数据,但在doOnEach运算符中上下文是空的。
任何人都可以帮助我了解如何在流中共享上下文以及我可以做出哪些更改来完成这项工作。
java reactor reactive-programming project-reactor spring-webflux
如果我们提供相同的数据源,所有这些方法都会产生相同的结果。那么它们之间有什么区别呢?
java reactive-programming project-reactor reactive-streams spring-webflux
我想将以下代码转换为F#:
static void Main(string[] args)
{
var y = Observable.Create<int>(x =>
{
x.OnNext(5);
return (() => { });
});
y.Subscribe(x => Console.WriteLine(x));
}
Run Code Online (Sandbox Code Playgroud)
尝试了以下内容:
let ob = Observable.Create<int>(fun x ->
x.OnNext(5)
fun unit -> unit)
Run Code Online (Sandbox Code Playgroud)
但没有成功.我该怎么办?
我有一个设计用于akka-io acking的Actor,这样它会在向上游(到网络)发送消息时等待Ack.此actor是后端中异步应用程序的接口.
我想要一个包装层,它允许我将这个Actor转换为akka-streams,Flow[Incoming, Outgoing, ???]以便它可以与期望这种签名的新库集成.
(来自上游的传入消息很少见,所以我们不太关心那里的背压,但是拥有它不会是坏事.)
sealed trait Incoming //... with implementations
sealed trait Outgoing //... with implementations
object Ack
// `upstream` is an akka-io connection actor that will send Ack
// when it writes an Outgoing message to the socket
class SimpleActor(upstream: Actor) extends Actor {
def receive = {
case in: Incoming if sender() == upstream =>
// does some work in response to upstream
case other =>
// does some work in response to downstream …Run Code Online (Sandbox Code Playgroud) 我正在尝试创建一个Observable这样的,它将在一个时间间隔内从网络加载一些数据,并在用户刷新页面时.这是我到目前为止的要点:
PublishSubject<Long> refreshSubject = PublishSubject.create();
Observable<MyDataType> observable = Observable.merge(
Observable.interval(0, 3, TimeUnit.SECONDS),
refreshSubject
)
.flatMap(t -> {
// network operations that eventually return a value
// these operations are not observables themselves
// they are fully blocking network operations
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
// update ui with data
}, error -> {
// do something with error
});
Run Code Online (Sandbox Code Playgroud)
后来在刷新回调中我有:
refreshSubject.onNext(0L);
Run Code Online (Sandbox Code Playgroud)
它在间隔很好的情况下运行,但是当我刷新时,它会爆炸NetworkOnMainThreadException.我以为我用subscribeOn/ 处理了这个observeOn.我错过了什么?另外,为什么当Observer从间隔触发时这不会导致崩溃?
假设我有一个可以从类型中发出元素或者失败的Single调用(在某些语言中会是这样).那是:s_0t_0TSingle<T>
s_0: -- t_0 // Success
OR
s_0: -- X // Failure
Run Code Online (Sandbox Code Playgroud)
从类型实例T有一个next()返回一个可选的方法Single从式T以及(一个Single<T>?在科特林).这种行为导致一系列Single实例能够发出一系列T实例,其中每个实例都s_i可以发出一个t_i能够返回下s_i+1一个单元的元素t_i+1,该元素将发出一个元素,依此类推,直到最后一个元素t_n-1不返回单个或任何一个元素为止单打失败:
s_0: -- t_0
?
s_1: -- t_1
?
s_2: -- t_2
...
?
s_n-1: -- t_n-1
?
null
OR
s_0: -- t_0
?
s_1: -- t_1
?
s_2: -- t_2
...
?
s_i: -- X …Run Code Online (Sandbox Code Playgroud) 我有一个Spring boot 1.4.3项目.最近我提出了一个要求,我必须将日志从服务器发送到我的Web应用程序并在网页上打印日志.我知道WebSockets,但我正在寻找更好的解决方案,我遇到了,Reactive Programming和gRPC.
Spring在Spring版本5中支持Reactive Programming,但我对gRPC和Reactive Programming非常困惑.gRPC具有双向流,它建立在Netty之上,提供与从服务器向客户端推送数据(如Reactive Programming)相同的功能.那么我应该使用哪一个,如果你能在这种混乱中清除我,那将是非常好的.
此外,如果我转移到支持Spring Version 5的Spring Boot 2,该项目将在Netty上运行.我的困惑是,我是否必须在不同的容器上运行我的应用程序,例如Jetty服务器上的普通REST端点和netty服务器上的Reactive API,或者Spring将通过处理netty上的响应请求和剩余的一般REST来为我开箱即用Jetty服务器上的API,因为据我所知Netty不是Servlet容器.
你能帮助我使用Array和流(?)来使用它来使用单个元素(String)将Movie保存到db并返回FLux.Spring特定的东西并不重要 - 只是迭代字母表和创建随机电影的方式.这样做的最佳和最科学的方法是什么?
val alphabet = arrayOf("A".."Z")
val exampleMovies: Flux<Movie> = Flux.just(alphabet)
.flatMap { movieRepository.save(Movie(name = it)) }
Run Code Online (Sandbox Code Playgroud)
我收到编译错误:
Error:(15, 62) Kotlin: Type mismatch: inferred type is Array<ClosedRange<String>>! but String? was expected
Run Code Online (Sandbox Code Playgroud) 我需要一个长按事件来绑定到svelte 3中的按钮。我想以尽可能少的“ boilerplaty”方式进行此操作。
我已经尝试过使用长按功能,但是这似乎有些令人费解和棘手,也似乎有点慢。
function longPress(node, callback) {
console.log(node)
function onmousedown(event) {
const timeout = setTimeout(() => callback(node.innerHTML), 1000);
function cancel() {
clearTimeout(timeout);
node.removeEventListener("mouseup", cancel, false);
}
node.addEventListener("mouseup", cancel, false);
}
node.addEventListener("mousedown", onmousedown, false);
return {
destroy() {
node.removeEventListener("mousedown", onmousedown, false);
}
};
}
</script>
<div>
<Video />
{#each Object.entries(bindings) as [id, value]}
<button on:click = {()=>longPress(this,addImage)}> {id} </button>
{/each}
</div>
Run Code Online (Sandbox Code Playgroud)
这可行,但是我敢肯定有更好的方法。
java ×3
kotlin ×2
rx-java ×2
spring ×2
akka ×1
akka-stream ×1
android ×1
c# ×1
c#-to-f# ×1
combine ×1
dom-events ×1
f# ×1
javascript ×1
range ×1
reactor ×1
rx-android ×1
spring-boot ×1
svelte ×1
swift ×1