Tim*_*kiy 8 java spring spring-mvc server-sent-events
我试图用Spring 4(tomcat 7,servlet-api 3.0.1)发送服务器发送的事件.
问题是Events在调用方法发送后我没有发送.它们都是在超时之后同时(具有相同的时间戳)到达客户端SseEmitter,带有EventSource错误事件.然后客户端正在尝试重新连接.知道发生了什么事吗?
我创建了一个简单的服务:
@RequestMapping(value = "subscribe", method = RequestMethod.GET)
public SseEmitter subscribe () throws IOException {
final SseEmitter emitter = new SseEmitter();
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
emitter.send(SseEmitter.event().data("Thread writing: " + Thread.currentThread()).name("ping"));
} catch (Exception e) {
}
}
} , 1000, 1000, TimeUnit.MILLISECONDS);
return emitter;
}
Run Code Online (Sandbox Code Playgroud)
使用客户代码:
sse = new EventSource(urlBuilder(base, url));
sse.addEventListener('ping', function (event) {
dfd.notify(event);
});
sse.addEventListener('message', function(event){
dfd.notify(event);
});
sse.addEventListener('close', function(event){
dfd.notify(event);
});
sse.onerror = function (error) {
console.log(error);
};
sse.onmessage = function (event){
dfd.notify(event);
};
Run Code Online (Sandbox Code Playgroud)
App initalizer代码
public class WebAppInitializer implements WebApplicationInitializer {
@Override
public void onStartup(ServletContext servletContext) throws ServletException {
AnnotationConfigWebApplicationContext ctx = new AnnotationConfigWebApplicationContext();
ctx.register(AppConfig.class);
ctx.setServletContext(servletContext);
ctx.refresh();
ServletRegistration.Dynamic dynamic = servletContext.addServlet("dispatcher", new DispatcherServlet(ctx));
dynamic.setAsyncSupported(true);
dynamic.addMapping("/api/*");
dynamic.setLoadOnStartup(1);
dynamic.setMultipartConfig(ctx.getBean(MultipartConfigElement.class));
javax.servlet.FilterRegistration.Dynamic filter = servletContext
.addFilter("StatelessAuthenticationFilter",
ctx.getBean("statelessAuthenticationFilter", StatelessAuthenticationFilter.class));
filter.setAsyncSupported(true);
filter.addMappingForUrlPatterns(null, false, "/api/*");
filter = servletContext.addFilter("HibernateSessionRequestFilter",
ctx.getBean("hibernateSessionRequestFilter", HibernateSessionRequestFilter.class));
filter.setAsyncSupported(true);
filter.addMappingForUrlPatterns(null, false, "/api/user/*");
}
}
Run Code Online (Sandbox Code Playgroud)
AppConfig.java
@Configuration
@ComponentScan("ru.esoft.workflow")
@EnableWebMvc
@PropertySource({"classpath:mail.properties", "classpath:fatclient.properties"})
@EnableAsync
@EnableScheduling
public class AppConfig extends WebMvcConfigurerAdapter {
...
}
Run Code Online (Sandbox Code Playgroud)
我自己在测试 SSEEmitter 时遇到了这个问题。从我在线阅读的所有内容来看,SSEEmitters 旨在与Reactive Streams的某些实现结合使用,例如RxJava。这有点复杂,但它绝对有效。这个想法是,您创建发射器和一个 Observable,并将后者订阅到 Publisher。Publisher 在单独的线程中执行其行为,当输出准备好时通知 Observable,并且 observable 会触发 emitter.send。这是一个示例片段,应该可以完成您想要的操作:
@RequestMapping("/whatever")
public SseEmitter index(
SseEmitter emitter = new SseEmitter();
Publisher<String> responsePublisher = someResponseGenerator.getPublisher();
Observable<String> responseObservable = RxReactiveStreams.toObservable(responsePublisher);
responseObservable.subscribe(
str -> {
try {
emitter.send(str);
} catch (IOException ex) {
emitter.completeWithError(ex);
}
},
error -> {
emitter.completeWithError(error);
},
emitter::complete
);
return emitter;
};
Run Code Online (Sandbox Code Playgroud)
这是相应的发布者:
public class SomeResponseGenerator {
public Publisher<String> getPublisher() {
Publisher<String> pub = new Publisher<String>() {
@Override
public void subscribe(Subscriber subscriber) {
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
subscriber.onNext("Thread writing: " + Thread.currentThread().getName());
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
}
};
return pub;
}
}
Run Code Online (Sandbox Code Playgroud)
这里和这里在线有一些此模型的示例,您可以通过谷歌搜索“RxJava SseEmitter”找到更多信息。理解 Reactive Streams/RxJava/SseEmitter 交互需要一些时间,但一旦你做到了,它就非常优雅了。希望这能让您走上正确的道路!
| 归档时间: |
|
| 查看次数: |
4243 次 |
| 最近记录: |