Java RMI + SSL +压缩=不可能!

Dra*_*Fax 9 java compression ssl rpc rmi

我已经设置了RMI + SSL.这非常有效.但似乎不可能在RMI和SSL之间减少压缩.这样RMI请求在通过SSL发送之前就会被压缩.

我在网上看到一些帖子建议使用SSLSocketFactory.createSocket()Socket将SSL包装在压缩套接字上.但这似乎会尝试压缩SSL协议本身,这可能不是非常可压缩的.

我想我应该创建一个Socket代理(它的子Socket类推迟到另一个Socket,就像FilterOutputStream那样).让代理用压缩包装输入/输出流.并让我SocketFactoryServerSocketFactory返回代理,包装SSLSocket.

但后来我们遇到了缓冲问题.压缩缓冲数据,直到它足够值得压缩,或被告知刷新.当您没有通过套接字进行来回通信时,这很好.但是使用RMI中的缓存套接字,你就可以了.无法识别RMI请求的结束,因此您可以刷新压缩数据.

Sun有一个RMISocketFactory例子做这样的事情,但他们根本没有解决这个问题.

注意:
1.SSL支持压缩但我在JSSE
2中找不到任何关于启用压缩的信息.我知道对许多小的无关块进行压缩(因为RMI通常由其组成)并不是非常有用.
3.我知道如果我发送大量请求,RMI不是最好的选择.
4. SSLRMISocketFactoryJava 6中有一个,但它不会在我的自定义实现中添加任何内容.

Paŭ*_*ann 28

我们这里有几个问题:

  • 我们不能简单地将SocketFactories包装在一起,就像我们可以为InputStreams和OutputStreams做的那样.
  • Java的基于zlib的DeflatorOutputStream不实现刷新.

我想我找到了一种机制,看起来似乎有效.

这将是一个部分系列,因为它需要一些时间来编写.(您可以在我的github存储库中找到已完成内容的源代码).

自定义SocketImpl

A Socket始终基于实现的对象SocketImpl.因此,拥有自定义套接字实际上意味着使用自定义SocketImpl类.这是一个基于一对流(和一个基本套接字,用于关闭目的)的实现:

/**
 * A SocketImpl implementation which works on a pair
 * of streams.
 *
 * A instance of this class represents an already
 * connected socket, thus all the methods relating to
 * connecting, accepting and such are not implemented.
 *
 * The implemented methods are {@link #getInputStream},
 * {@link #getOutputStream}, {@link #available} and the
 * shutdown methods {@link #close}, {@link #shutdownInput},
 * {@link #shutdownOutput}.
 */
private static class WrappingSocketImpl extends SocketImpl {
    private InputStream inStream;
    private OutputStream outStream;

    private Socket base;

    WrappingSocketImpl(StreamPair pair, Socket base) {
        this.inStream = pair.input;
        this.outStream = pair.output;
        this.base = base;
    }
Run Code Online (Sandbox Code Playgroud)

A StreamPair是一个简单的数据持有者类,见下文.

这些是重要的方法:

    protected InputStream getInputStream() {
        return inStream;
    }

    protected OutputStream getOutputStream() {
        return outStream;
    }

    protected int available() throws IOException {
        return inStream.available();
    }
Run Code Online (Sandbox Code Playgroud)

然后一些方法允许关闭.这些都没有真正测试过(也许我们也应该关闭或至少刷新流?),但它似乎适用于我们的RMI使用.

    protected void close() throws IOException {
        base.close();
    }

    protected void shutdownInput() throws IOException {
        base.shutdownInput();
        // TODO: inStream.close() ?
    }

    protected void shutdownOutput() throws IOException {
        base.shutdownOutput();
        // TODO: outStream.close()?
    }
Run Code Online (Sandbox Code Playgroud)

接下来的一些方法将由Socket构造函数调用(或间接由RMI引擎中的某些东西调用),但实际上并不需要做任何事情.

    protected void create(boolean stream) {
        if(!stream) {
            throw new IllegalArgumentException("datagram socket not supported.");
        }
    }

    public Object getOption(int optID) {
        System.err.println("getOption(" + optID + ")");
        return null;
    }

    public void setOption(int optID, Object value) {
        // noop, as we don't have any options.
    }
Run Code Online (Sandbox Code Playgroud)

所有剩下的方法都没有必要,我们实现它们抛出异常(所以我们会注意到这个假设是错误的).

    // unsupported operations

    protected void connect(String host, int port) {
        System.err.println("connect(" + host + ", " + port + ")");
        throw new UnsupportedOperationException();
    }


    protected void connect(InetAddress address, int port) {
        System.err.println("connect(" + address + ", " + port + ")");
        throw new UnsupportedOperationException();
    }

    protected void connect(SocketAddress addr, int timeout) {
        System.err.println("connect(" + addr + ", " + timeout + ")");
        throw new UnsupportedOperationException();
    }

    protected void bind(InetAddress host, int port) {
        System.err.println("bind(" + host + ", " + port + ")");
        throw new UnsupportedOperationException();
    }

    protected void listen(int backlog) {
        System.err.println("listen(" + backlog + ")");
        throw new UnsupportedOperationException();
    }

    protected void accept(SocketImpl otherSide) {
        System.err.println("accept(" + otherSide + ")");
        throw new UnsupportedOperationException();
    }

    protected void sendUrgentData(int data) {
        System.err.println("sendUrgentData()");
        throw new UnsupportedOperationException();
    }
}
Run Code Online (Sandbox Code Playgroud)

这是构造函数使用的StreamPair:

/**
 * A simple holder class for a pair of streams.
 */
public static class StreamPair {
    public InputStream input;
    public OutputStream output;
    public StreamPair(InputStream in, OutputStream out) {
        this.input = in; this.output = out;
    }
}
Run Code Online (Sandbox Code Playgroud)

下一部分:使用它来实现一个Socket工厂.


一个Socket工厂,包装另一个.

We are dealing here with RMI socket factories (i.e. RMIClientSocketFactory, RMIServerSocketFactory, RMISocketFactory in java.rmi.server), but the same idea applies to other libraries using a socket factory interface as well. Examples are javax.net.SocketFactory (and ServerSocketFactory), Apache Axis' SocketFactory, JSch's SocketFactory.

Often, the idea of these factories is that they somehow connect to another server than the original one (a proxy), then do some negotiating and either simple can continue now in the same connection or have to tunnel the real connection through some other protocol (using wrapping streams). We instead want to let some other socket factory do the original connecting, and then do only the stream wrapping ourselves.

RMI has separate interfaces for the client and server socket factories. The client socket factories will be serialized and passed from the server to the client together with the remote stubs, allowing the client to reach the server.

还有一个RMISocketFactory实现两个接口的抽象类,并提供一个VM全局默认套接字工厂,它将用于所有没有自己的远程对象的远程对象.

我们现在将实现此类的子类(从而也实现两个接口),允许用户提供基本客户端和服务器套接字工厂,然后我们将使用它.我们的类必须是可序列化的,以允许将它传递给客户端.

/**
 * A base class for RMI socket factories which do their
 * work by wrapping the streams of Sockets from another
 * Socket factory.
 *
 * Subclasses have to overwrite the {@link #wrap} method.
 *
 * Instances of this class can be used as both client and
 * server socket factories, or as only one of them.
 */
public abstract class WrappingSocketFactory 
    extends RMISocketFactory
    implements Serializable
{
Run Code Online (Sandbox Code Playgroud)

(想象一下所有其余的相对于这个类缩进.)

我们想要参考其他工厂,这里是领域.

/**
 * The base client socket factory. This will be serialized.
 */
private RMIClientSocketFactory baseCFactory;

/**
 * The base server socket factory. This will not be serialized,
 * since the server socket factory is used only on the server side.
 */
private transient RMIServerSocketFactory baseSFactory;
Run Code Online (Sandbox Code Playgroud)

这些将由简单的构造函数初始化(我在此不再重复 - 查看完整代码的github存储库).

抽象wrap方法

To let this "wrapping of socket factories" be general, we do only the general mechanism here, and do the actual wrapping of the streams in subclasses. Then we can have a compressing/decompressing subclass, a encrypting one, a logging one, etc.

Here we only declare the wrap method:

/**
 * Wraps a pair of streams.
 * Subclasses must implement this method to do the actual
 * work.
 * @param input the input stream from the base socket.
 * @param output the output stream to the base socket.
 * @param server if true, we are constructing a socket in
 *    {@link ServerSocket#accept}. If false, this is a pure
 *   client socket.
 */
protected abstract StreamPair wrap(InputStream input,
                                   OutputStream output,
                                   boolean server);
Run Code Online (Sandbox Code Playgroud)

This method (and the fact that Java doesn't allow multiple return values) is the reason for the StreamPair class. Alternatively we could have two separate methods, but in some cases (as for SSL) it is necessary to know which two streams are paired.

Client Socket Factory

Now, lets have a look at the client socket factory implementation:

/**
 * Creates a client socket and connects it to the given host/port pair.
 *
 * This retrieves a socket to the host/port from the base client
 * socket factory and then wraps a new socket (with a custom SocketImpl)
 * around it.
 * @param host the host we want to be connected with.
 * @param port the port we want to be connected with.
 * @return a new Socket connected to the host/port pair.
 * @throws IOException if something goes wrong.
 */
public Socket createSocket(String host, int port)
    throws IOException
{
    Socket baseSocket = baseCFactory.createSocket(host, port);
Run Code Online (Sandbox Code Playgroud)

We retrieve a socket from our base factory, and then ...

    StreamPair streams = this.wrap(baseSocket.getInputStream(),
                                   baseSocket.getOutputStream(),
                                   false);
Run Code Online (Sandbox Code Playgroud)

... wrap its streams by new streams. (This wrap has to be implemented by subclasses, see below).

    SocketImpl wrappingImpl = new WrappingSocketImpl(streams, baseSocket);
Run Code Online (Sandbox Code Playgroud)

Then we use these streams to create our WrappingSocketImpl (see above), and pass it ...

    return new Socket(wrappingImpl) {
        public boolean isConnected() { return true; }
    };
Run Code Online (Sandbox Code Playgroud)

... to a new Socket. We have to subclass Socket because this constructor is protected, but this is opportune since we also have to override the isConnected method to return true instead of false. (Remember, our SocketImpl is already connected, and does not support connecting.)

}
Run Code Online (Sandbox Code Playgroud)

For client socket factories, this is already enough. For server socket factories, it gets a bit more complicated.

Wrapping ServerSockets

There seems to be no way to create a ServerSocket with a given SocketImpl object - it always uses the static SocketImplFactory. Thus we now subclass ServerSocket, simply ignoring its SocketImpl, instead delegating to another ServerSocket.

/**
 * A server socket subclass which wraps our custom sockets around the
 * sockets retrieves by a base server socket.
 *
 * We only override enough methods to work. Basically, this is
 * a unbound server socket, which handles {@link #accept} specially.
 */
private class WrappingServerSocket extends ServerSocket {
    private ServerSocket base;

    public WrappingServerSocket(ServerSocket b)
        throws IOException
    {
        this.base = b;
    }
Run Code Online (Sandbox Code Playgroud)

It turns out we have to implement this getLocalPort, since this number is sent with the remote stub to the clients.

    /**
     * returns the local port this ServerSocket is bound to.
     */
    public int getLocalPort() {
        return base.getLocalPort();
    }
Run Code Online (Sandbox Code Playgroud)

The next method is the important one. It works similar to our createSocket() method above.

    /**
     * accepts a connection from some remote host.
     * This will accept a socket from the base socket, and then
     * wrap a new custom socket around it.
     */
    public Socket accept() throws IOException {
Run Code Online (Sandbox Code Playgroud)

We let the base ServerSocket accept a connection, then wrap its streams:

        final Socket baseSocket = base.accept();
        StreamPair streams =
            WrappingSocketFactory.this.wrap(baseSocket.getInputStream(),
                                            baseSocket.getOutputStream(),
                                            true);
Run Code Online (Sandbox Code Playgroud)

Then we create our WrappingSocketImpl, ...

        SocketImpl wrappingImpl =
            new WrappingSocketImpl(streams, baseSocket);
Run Code Online (Sandbox Code Playgroud)

... and create another anonymous subclass of Socket:

        // For some reason, this seems to work only as a
        // anonymous direct subclass of Socket, not as a
        // external subclass.      Strange.
        Socket result = new Socket(wrappingImpl) {
                public boolean isConnected() { return true; }
                public boolean isBound() { return true; }
                public int getLocalPort() {
                    return baseSocket.getLocalPort();
                }
                public InetAddress getLocalAddress() {
                    return baseSocket.getLocalAddress();
                }
            };
Run Code Online (Sandbox Code Playgroud)

This one needs some more overridden methods, as these are called by the RMI engine, it seems.

I tried to put these in a separate (non-local) class, but this did not work (gave exceptions at the client side on connecting). I have no idea why. If someone has an idea, I'm interested.

        return result;
    }
}
Run Code Online (Sandbox Code Playgroud)

Having this ServerSocket subclass, we can complete our ...

wrapping RMI server socket factory

/**
 * Creates a server socket listening on the given port.
 *
 * This retrieves a ServerSocket listening on the given port
 * from the base server socket factory, and then creates a 
 * custom server socket, which on {@link ServerSocket#accept accept}
 * wraps new Sockets (with a custom SocketImpl) around the sockets
 * from the base server socket.
 * @param host the host we want to be connected with.
 * @param port the port we want to be connected with.
 * @return a new Socket connected to the host/port pair.
 * @throws IOException if something goes wrong.
 */
public ServerSocket createServerSocket(int port)
    throws IOException
{
    final ServerSocket baseSocket = getSSFac().createServerSocket(port);
    ServerSocket ss = new WrappingServerSocket(baseSocket);
    return ss;
}
Run Code Online (Sandbox Code Playgroud)

Not much to say, it all is already in the comment. Yes, I know I could do this all in one line. (There originally were some debugging outputs between the lines.)

Let's finish the class:

}
Run Code Online (Sandbox Code Playgroud)

Next time: a tracing socket factory.


A tracing socket factory.

To test our wrapping and see if there are enough flushes, here the wrap method of a first subclass:

protected StreamPair wrap(InputStream in, OutputStream out, boolean server)
{
    InputStream wrappedIn = in;
    OutputStream wrappedOut = new FilterOutputStream(out) {
            public void write(int b) throws IOException {
                System.err.println("write(.)");
                super.write(b);
            }
            public void write(byte[] b, int off, int len)
                throws IOException {
                System.err.println("write(" + len + ")");
                super.out.write(b, off, len);
            }
            public void flush() throws IOException {
                System.err.println("flush()");
                super.flush();
            }
        };
    return new StreamPair(wrappedIn, wrappedOut);
}
Run Code Online (Sandbox Code Playgroud)

The input stream is used as is, the output stream simply adds some logging.

On the server side, it looks like this (the [example] comes from ant):

  [example] write(14)
  [example] flush()
  [example] write(287)
  [example] flush()
  [example] flush()
  [example] flush()
  [example] write(1)
  [example] flush()
  [example] write(425)
  [example] flush()
  [example] flush()
Run Code Online (Sandbox Code Playgroud)

We see that there are enough flushes, even more than enough. (The numbers are the lengths of the output chunks.) (On client side, this actually throws a java.rmi.NoSuchObjectException. It worked before ... no idea why it doesn't work now. As the compressing example does work and I'm tired, I'll not search for it now.)

Next: compressing.


Flushing compressed streams

For compression, Java has some classes in the java.util.zip package. There is the pair DeflaterOutputStream/InflaterInputStream which implement the deflate compression algorithm by wrapping another stream, filtering the data through a Deflater or Inflater, respectively. Deflater and Inflater are based on native methods calling the common zlib library. (Actually, the streams could also support other algorithms, if someone provided subclasses with alternate implementations of Deflater and Inflater.)

(There are also DeflaterInputStream and InflaterOutputStream, which work the other way around.)

Based on this, GZipOutputStream and GZipInputStream implement the GZip file format. (This adds mainly some header and footer, and a checksum.)

Both output streams have the problem (for our use case) that they don't truly support flush(). This is caused by a deficiency in the API definition of Deflater, which is allowed to buffer as much data as its want until the final finish(). Zlib allows flushing its state, just the Java wrapper is too stupid.

There is bug #4206909 open about this since January 1999, and it looks like it is finally fixed for Java 7, hurray! If you have Java 7, you can simply use DeflaterOutputStream here.

Since I don't have Java 7, yet, I'll use the workaround posted in the bug comments on 23-JUN-2002 by rsaddey.

/**
 * Workaround für kaputten GZipOutputStream, von
 * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4206909
 * (23-JUN-2002, rsaddey)
 * @see DecompressingInputStream
 */
public class CompressingOutputStream
    extends DeflaterOutputStream {


    public CompressingOutputStream (final OutputStream out)
    {
        super(out,
              // Using Deflater with nowrap == true will ommit headers
              //  and trailers
              new Deflater(Deflater.DEFAULT_COMPRESSION, true));
    }

    private static final byte [] EMPTYBYTEARRAY = new byte[0];
    /**
     * Insure all remaining data will be output.
     */
    public void flush() throws IOException {
        /**
         * Now this is tricky: We force the Deflater to flush
         * its data by switching compression level.
         * As yet, a perplexingly simple workaround for 
         *  http://developer.java.sun.com/developer/bugParade/bugs/4255743.html 
        */
        def.setInput(EMPTYBYTEARRAY, 0, 0);

        def.setLevel(Deflater.NO_COMPRESSION);
        deflate();

        def.setLevel(Deflater.DEFAULT_COMPRESSION);
        deflate();

        out.flush();
    }

    /**
     * Wir schließen auch den (selbst erstellten) Deflater, wenn
     * wir fertig sind.
     */
    public void close()
        throws IOException
    {
        super.close();
        def.end();
    }

} // class

/**
 * Workaround für kaputten GZipOutputStream, von
 * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4206909
 * (23-JUN-2002, rsaddey)
 * @see CompressingOutputStream
 */
public class DecompressingInputStream extends InflaterInputStream {

    public DecompressingInputStream (final InputStream in) {
        // Using Inflater with nowrap == true will ommit headers and trailers
        super(in, new Inflater(true));
    }

    /**
     * available() should return the number of bytes that can be read without
     * running into blocking wait. Accomplishing this feast would eventually
     * require to pre-inflate a huge chunk of data, so we rather opt for a
     * more relaxed contract (java.util.zip.InflaterInputStream does not 
     * fit the bill). 
     * This code has been tested to work with BufferedReader.readLine();
     */
    public int available() throws IOException {
        if (!inf.finished() && !inf.needsInput()) {
            return 1;
        } else {
            return in.available();
        }
    }

    /**
     * Wir schließen auch den (selbst erstellten) Inflater, wenn
     * wir fertig sind.
     */
    public void close()
        throws IOException
    {
        super.close();
        inf.end();
    }

} //class
Run Code Online (Sandbox Code Playgroud)

(These are in the de.fencing_game.tools package in my github repository.) It has some German comments since I originally one year ago copied this for another project of mine.)

Searching a bit on Stackoverflow I found this answer by BalusC to a related question, which offers another compressing Outputstream, with optimized flushing. I did not test this, but it might be an alternative to this one. (It uses gzip format, while we are using the pure deflate format here. Make sure both writing and reading stream fit together.)

Another alternative would be using JZlib, as bestsss proposed, with it's ZOutputStream and ZInputStream. It has not much documentation, but I'm working on it.

Next time: compressed RMI socket factory


Compressing RMI socket factory

Now we can pull it all together.

/**
 * An RMISocketFactory which enables compressed transmission.
 * We use {@link #CompressingInputStream} and {@link #CompressingOutputStream}
 * for this.
 *
 * As we extend WrappingSocketFactory, this can be used on top of another
 * {@link RMISocketFactory}.
 */
public class CompressedRMISocketFactory
    extends WrappingSocketFactory
{

    private static final long serialVersionUID = 1;

    //------------ Constructors -----------------

    /**
     * Creates a CompressedRMISocketFactory based on a pair of
     * socket factories.
     *
     * @param cFac the base socket factory used for creating client
     *   sockets. This may be {@code null}, then we will use the
     *  {@linkplain RMISocketFactory#getDefault() default socket factory}
     *  of client system where this object is finally used for
     *   creating sockets.
     *   If not null, it should be serializable.
     * @param sFac the base socket factory used for creating server
     *   sockets. This may be {@code null}, then we will use the
     *  {@linkplain RMISocketFactory#getDefault() default RMI Socket factory}.
     *  This will not be serialized to the client.
     */
    public CompressedRMISocketFactory(RMIClientSocketFactory cFac,
                                      RMIServerSocketFactory sFac) {
        super(cFac, sFac);
    }

    // [snipped more constructors]

    //-------------- Implementation -------------

    /**
     * wraps a pair of streams into compressing/decompressing streams.
     */
    protected StreamPair wrap(InputStream in, OutputStream out,
                              boolean server)
    {
        return new StreamPair(new DecompressingInputStream(in),
                              new CompressingOutputStream(out));
    }
}
Run Code Online (Sandbox Code Playgroud)

That's it. We now provide this factory object to UnicastRemoteObject.export(...) as arguments (both for client and server factory), and all the communication will be compressed. (The version in my github repository has a main method with an example.)

Of course, the compression benefits will not be huge fore things like RMI, at least when you don't transfer large strings or similar stuff as arguments or return values.

Next time (after I have slept): combining with an SSL socket factory.


Combining with an SSL socket factory

The Java part of this is easy, if we use the default classes:

CompressedRMISocketFactory fac =
    new CompressedRMISocketFactory(new SslRMIClientSocketFactory(),
                   new SslRMIServerSocketFactory());
Run Code Online (Sandbox Code Playgroud)

These classes (in javax.rmi.ssl) use the default SSLSocketFactory and SSLServerSocketFactory (in javax.net.ssl), which use the system's default keystore and trust store.

Thus it is necessary to create a key store with keypair (for example by keytool -genkeypair -v), and provide this to the VM with the system properties javax.net.ssl.keyStore (the file name for the key store) and javax.net.ssl.keyStorePassword (the password for the key store).

On the client side, we need a trust store - i.e. a key store containing the public keys, or some certificate which signed the public keys of the server. For testing purposes, we simply can use the same keystore as the server, for production you certainly would not want the server's private key on the client side. We provide this with the properties javax.net.ssl.trustStore javax.net.ssl.trustStorePassword.

Then it gets down to this (on the server side):

    Remote server =
        UnicastRemoteObject.exportObject(new EchoServerImpl(),
                                         0, fac, fac);
    System.err.println("server: " + server);

    Registry registry =
        LocateRegistry.createRegistry(Registry.REGISTRY_PORT);

    registry.bind("echo", server);
Run Code Online (Sandbox Code Playgroud)

The client is a stock client as for the previous examples:

    Registry registry =
        LocateRegistry.getRegistry("localhost",
                                   Registry.REGISTRY_PORT);

    EchoServer es = (EchoServer)registry.lookup("echo");
    System.err.println("es: " + es);
    System.out.println(es.echo("hallo"));
Run Code Online (Sandbox Code Playgroud)

Now all communication to the EchoServer runs compressed and encrypted. Of course, for complete security we also would want the communication to the registry SSL-protected, to avoid any man-in-the-middle attacks (which would allow also intercepting communication to the EchoServer by giving the client a fake RMIClientSocketFactory, or fake server address).