使用Jersey SSE进行广播:检测已关闭的连接

Han*_*ank 6 java jersey server-sent-events

我相信这个问题不是与Jersey发送事件的重复:在客户端丢弃后EventOutput没有关闭,但可能与Jersey Server-Sent Events有关 - 写入断开的连接不会抛出异常.

在泽西岛文件的第15.4.2章中,描述了SseBroadcaster:

但是,SseBroadcaster在内部识别并处理客户端断开连接.当客户端关闭连接时,广播公司会检测到这种情况,并从已注册的EventOutputs的内部集合中删除过时连接,并释放与过时连接关联的所有服务器端资源.

我无法证实这一点.在下面的测试用例中,我看到子类SseBroadcasteronClose()方法永远不会被调用:不是在EventInput关闭时,而是在广播另一个消息时.

public class NotificationsResourceTest extends JerseyTest {
    final static Logger log = LoggerFactory.getLogger(NotificationsResourceTest.class);

    final static CountingSseBroadcaster broadcaster = new CountingSseBroadcaster();

    public static class CountingSseBroadcaster extends SseBroadcaster { 
        final AtomicInteger connectionCounter = new AtomicInteger(0);

        public EventOutput createAndAttachEventOutput() {
            EventOutput output = new EventOutput();
            if (add(output)) {
                int cons = connectionCounter.incrementAndGet();
                log.debug("Active connection count: "+ cons);
            }
            return output;
        }

        @Override
        public void onClose(final ChunkedOutput<OutboundEvent> output) {
            int cons = connectionCounter.decrementAndGet();
            log.debug("A connection has been closed. Active connection count: "+ cons);
        }

        @Override
        public void onException(final ChunkedOutput<OutboundEvent> chunkedOutput, final Exception exception) {
            log.trace("An exception has been detected", exception);
        }

        public int getConnectionCount() {
            return connectionCounter.get();
        }
    }

    @Path("notifications")
    public static class NotificationsResource {

        @GET
        @Produces(SseFeature.SERVER_SENT_EVENTS)
        public EventOutput subscribe() {
            log.debug("New stream subscription");

            EventOutput eventOutput = broadcaster.createAndAttachEventOutput();
            return eventOutput;
        }
    }   

    @Override
    protected Application configure() {
        ResourceConfig config = new ResourceConfig(NotificationsResource.class);
        config.register(SseFeature.class);

        return config;
    }


    @Test
    public void test() throws Exception {
        // check that there are no connections
        assertEquals(0, broadcaster.getConnectionCount());

        // connect subscriber
        log.info("Connecting subscriber");
        EventInput eventInput = target("notifications").request().get(EventInput.class);
        assertFalse(eventInput.isClosed());

        // now there are connections
        assertEquals(1, broadcaster.getConnectionCount());

        // push data
        log.info("Broadcasting data");
        String payload = UUID.randomUUID().toString();
        OutboundEvent chunk = new OutboundEvent.Builder()
                .mediaType(MediaType.TEXT_PLAIN_TYPE)
                .name("message")
                .data(payload)
                .build();
        broadcaster.broadcast(chunk);

        // read data
        log.info("Reading data");
        InboundEvent inboundEvent = eventInput.read();
        assertNotNull(inboundEvent);
        assertEquals(payload, inboundEvent.readData());

        // close subscription 
        log.info("Closing subscription");
        eventInput.close();
        assertTrue(eventInput.isClosed());

        // at this point, the subscriber has disconnected itself, 
        // but jersey doesnt realise that
        assertEquals(1, broadcaster.getConnectionCount());

        // wait, give TCP a chance to close the connection
        log.debug("Sleeping for some time");
        Thread.sleep(10000);

        // push data again, this should really flush out the not-connected client
        log.info("Broadcasting data again");
        broadcaster.broadcast(chunk);
        Thread.sleep(100);

        // there is no subscriber anymore
        assertEquals(0, broadcaster.getConnectionCount());  // FAILS!
    }
}
Run Code Online (Sandbox Code Playgroud)

也许JerseyTest不是测试这个的好方法.在EventSource使用JavaScript的较少临床设置中,我看到onClose()被调用,但只有在先前关闭的连接上广播消息之后.

我究竟做错了什么?

为什么没有SseBroadcaster检测到客户端关闭连接?

跟进

我发现JERSEY-2833Works设计拒绝了:

根据15.4.1 中SSE章节(https://jersey.java.net/documentation/latest/sse.html)中的Jersey文档,提到Jersey没有明确关闭连接,它是资源方法的责任或者客户端.

这究竟是什么意思?资源是否应执行超时并终止所有活动和已关闭的客户端连接?

bad*_*unk 0

我认为最好在资源上设置超时并仅终止该连接,例如:

@Path("notifications")
public static class NotificationsResource {

    @GET
    @Produces(SseFeature.SERVER_SENT_EVENTS)
    public EventOutput subscribe() {
        log.debug("New stream subscription");

        EventOutput eventOutput = broadcaster.createAndAttachEventOutput();
        new Timer().schedule( new TimerTask()
        {
            @Override public void run()
            {
               eventOutput.close()
            }
        }, 10000); // 10 second timeout
        return eventOutput;
    }
}   
Run Code Online (Sandbox Code Playgroud)