如何为 Java 11 HttpRequest 创建自定义 BodyPublisher

Cha*_*lle 5 java java-http-client java-11

我正在尝试创建一个自定义BodyPublisher来反序列化我的 JSON 对象。我可以在创建请求时反序列化 JSON 并使用 的ofByteArray方法,BodyPublishers但我宁愿使用自定义发布者。

public class CustomPublisher implements HttpRequest.BodyPublisher {
    private byte[] bytes;
    
    public CustomPublisher(ObjectNode jsonData) {
        ...
        // Deserialize jsonData to bytes
        ...
    }
    
    @Override
    public long contentLength() {
        if(bytes == null) return 0;
        return bytes.length
    }
    
    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
        subscriber.onSubscribe(subscription);       
    }

    private CustomSubscription implements Flow.Subscription {
         private final Flow.Subscriber<? super ByteBuffer> subscriber;
         private boolean cancelled;
         private Iterator<Byte> byterator;

         private CustomSubscription(Flow.Subscriber<? super ByteBuffer> subscriber, byte[] bytes) {
             this.subscriber = subscriber;
             this.cancelled = false;
             List<Byte> bytelist = new ArrayList<>();
             for(byte b : bytes) {
                 bytelist.add(b);
             }
             this.byterator = bytelist.iterator();
         }

         @Override
         public void request(long n) {
             if(cancelled) return;
             if(n < 0) {
                 subscriber.onError(new IllegalArgumentException());
             } else if(byterator.hasNext()) {
                 subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()));
             } else {
                 subscriber.onComplete();
             }
         }

         @Override
         public void cancel() {
             this.cancelled = true;
         }
    }
}
Run Code Online (Sandbox Code Playgroud)

此实现有效,但request前提是使用 1 作为参数调用subscriptions方法。但这就是我将它与 HttpRequest 一起使用时会发生的情况。

我很确定这不是创建自定义订阅的任何首选或最佳方式,但我还没有找到更好的方法来使它工作。

如果有人能引导我走上更好的道路,我将不胜感激。

VGR*_*VGR 2

避免用它创建字节数组是正确的,因为这会给大对象带来内存问题。

\n

我不会尝试编写自定义发布器。相反,只需利用工厂方法HttpRequest.BodyPublishers.ofInputStream即可。

\n
HttpRequest.BodyPublisher publisher =\n    HttpRequest.BodyPublishers.ofInputStream(() ->  {\n        PipedInputStream in = new PipedInputStream();\n\n        ForkJoinPool.commonPool().submit(() -> {\n            try (PipedOutputStream out = new PipedOutputStream(in)) {\n                objectMapper.writeTree(\n                    objectMapper.getFactory().createGenerator(out),\n                    jsonData);\n            }\n            return null;\n        });\n\n        return in;\n    });\n
Run Code Online (Sandbox Code Playgroud)\n

正如您所指出的,您可以使用HttpRequest.BodyPublishers.ofByteArray. 对于相对较小的对象来说这很好,但我出于习惯而对可扩展性进行编程。假设代码不需要扩展的问题是其他开发人员会假设传递大型对象是安全的,而没有意识到对性能的影响。

\n

编写自己的身体出版商将是一项艰巨的工作。它的subscribe方法继承自Flow.Publisher

\n

该方法的文档subscribe以此开头:

\n
\n

如果可能,添加给定的订阅者。

\n
\n

每次subscribe调用您的方法时,您都需要将订阅者添加到某种集合中,需要创建Flow.Subscription的实现,并且需要立即将其传递给订阅者\xe2\x80\x99sonSubscribe方法。request仅当调用Subscription\xe2\x80\x99s 方法时,通过调用相应的 Subscriber\xe2\x80\x99s(不仅仅是任何 Subscriber\xe2\x80\x99s),您的 Subscription 实现对象才需要发回一个或多个 ByteBuffer onNext方法,一旦您\xe2\x80\x99 发送了所有数据,您必须调用相同的 Subscriber\xe2\x80\x99sonComplete()方法。最重要的是,订阅实现对象需要处理cancel请求。

\n

您可以通过扩展SubmissionPublisher(这是 Flow.Publisher 的默认实现),然后添加contentLength()。但正如 SubmissionPublisher 文档所示,即使是最小的工作实现,您仍然有相当多的工作要做。

\n

HttpRequest.BodyPublishers.of\xe2\x80\xa6 方法将为您完成所有这些工作。 ofByteArray对于小对象来说没问题,但ofInputStream对于你可能传入的任何对象都适用。

\n

  • 提交将数据流式传输到 PipedInputStream 到公共池的函数应突出显示为有风险。通常,提交到公共池的任务不应阻塞,因为池很小(CPU 计数 -1),并且如果它被充分利用,系统中的其他任务可能会变得不稳定。例如:HttpClient 使用公共池进行异步 IO。 (3认同)