如何使用 RSocket 创建文件发送客户端/服务器?

Jam*_*mes 5 java file-sharing download aeron rsocket

我似乎无法在RSocket上找到任何资源/教程,只能在 GitHub 上阅读他们的代码,我不明白。

我的服务器上有一个文件路径: String serverFilePath;

我希望能够从我的客户端下载它(最好使用RSocket 的 Aeron 实现)。有谁知道如何使用 RSocket 做到这一点?

提前致谢。

小智 3

我从事 RSocket 工作,并编写了 Java 版本的很大一部分,包括 Aeron 传输。

目前我不建议使用 Aeron 实现。您可以通过以下几种方式发送文件:

  1. 使用 requestChannel 将数据推送到远程服务器。
  2. 使用 requestChannel 或 requestStream 将字节流传输到客户端。

这是使用 requestStream 的示例:

  public class FileCopy {

  public static void main(String... args) throws Exception {

    // Create a socket that receives incoming connections
    RSocketFactory.receive()
        .acceptor(
            new SocketAcceptor() {
              @Override
              // Create a new socket acceptor
              public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
                return Mono.just(
                    new AbstractRSocket() {
                      @Override
                      public Flux<Payload> requestStream(Payload payload) {
                        // Get the path of the file to copy
                        String path = payload.getDataUtf8();
                        SeekableByteChannel _channel = null;

                        try {
                          _channel = Files.newByteChannel(Paths.get(path), StandardOpenOption.READ);
                        } catch (IOException e) {
                          return Flux.error(e);
                        }

                        ReferenceCountUtil.safeRelease(payload);

                        SeekableByteChannel channel = _channel;
                        // Use Flux.generate to create a publisher that returns file at 1024 bytes
                        // at a time
                        return Flux.generate(
                            sink -> {
                              try {
                                ByteBuffer buffer = ByteBuffer.allocate(1024);
                                int read = channel.read(buffer);
                                buffer.flip();
                                sink.next(DefaultPayload.create(buffer));

                                if (read == -1) {
                                  channel.close();
                                  sink.complete();
                                }
                              } catch (Throwable t) {
                                sink.error(t);
                              }
                            });
                      }
                    });
              }
            })
        .transport(TcpServerTransport.create(9090))
        .start()
        .subscribe();

    String path = args[0];
    String dest = args[1];

    // Connect to a server
    RSocket client =
        RSocketFactory.connect().transport(TcpClientTransport.create(9090)).start().block();

    File f = new File(dest);
    f.createNewFile();

    // Open a channel to a new file
    SeekableByteChannel channel =
        Files.newByteChannel(f.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);

    // Request a stream of bytes
    client
        .requestStream(DefaultPayload.create(path))
        .doOnNext(
            payload -> {
              try {
                // Write the bytes received to the new file
                ByteBuffer data = payload.getData();
                channel.write(data);

                // Release the payload
                ReferenceCountUtil.safeRelease(payload);
              } catch (Exception e) {
                  throw new RuntimeException(e);
              }
            })
        // Block until all the bytes are received
        .blockLast();

    // Close the file you're writing too
    channel.close();
  }
}
Run Code Online (Sandbox Code Playgroud)