Reactive Spring WebClient - 进行SOAP调用

Fai*_*ood 11 soap-client spring-boot reactive spring-webflux

我期待从Spring反应webclient进行SOAP调用.我找不到任何文件.想知道这个方法会是什么.现在我在想

  1. 在单独的线程池上使用JAXB构造SOAP消息
  2. 通过webclient将其转换为字符串来进行调用
  3. 在返回单独的tp的路上,使用jaxb转换回java.

有什么缺点和其他方法?

Pet*_*nya 6

您需要生成 SOAP 客户端作为具有异步方法的存根类。JAX-WS API 支持异步调用。使用wsiimportenableAsyncMapping生成方法 operationAsync(Input request, AsyncHandler asyncHandler);

AsyncHandler 使用 Mono.create() 创建

Service service = new Service();
ServicePortType portType = service.getPortType();

public Mono<Output> operation(Input input) {
            return Mono.create(sink ->
               portType.operation(input, outputFuture -> {
                   try {
                       sink.success(outputFuture.get());
                   } catch (Exception e) {
                       sink.error(e);
                   }
               })
            );
        }
Run Code Online (Sandbox Code Playgroud)

你会反应性地得到 Mono

我在https://blog.godatadriven.com/jaxws-reactive-client帖子中找到了建议

  • 此解决方案不使用 Spring Reactor 的 WebClient。 (4认同)
  • 欢迎提供解决方案的链接,但请确保您的答案在没有它的情况下也有用:[在链接周围添加上下文](//meta.stackexchange.com/a/8259) 这样您的其他用户就会知道它是什么以及为什么它在那里,然后引用您链接到的页面中最相关的部分,以防目标页面不可用。[仅是链接的答案可能会被删除。](//stackoverflow.com/help/deleted-answers) (3认同)

gun*_*gor 6

这是 Spring Reactor 的一个工作示例:https : //github.com/gungor/spring-webclient-soap

您需要将生成的 JAXB 类包含在带有自定义编码器的肥皂封套中,如下所示,然后将其添加到 WebClient 的交换策略中。

package webclient.soap.encoding;

import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.EncodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.ws.WebServiceMessage;
import org.springframework.ws.WebServiceMessageFactory;
import org.springframework.ws.client.core.WebServiceTemplate;
import org.springframework.ws.support.DefaultStrategiesHelper;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.MarshalException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlType;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class Jaxb2SoapEncoder implements Encoder<Object> {

    private final JaxbContextContainer jaxbContexts = new JaxbContextContainer();

    @Override
    public boolean canEncode(ResolvableType elementType, MimeType mimeType) {
        Class<?> outputClass = elementType.toClass();
        return (outputClass.isAnnotationPresent(XmlRootElement.class) ||
                    outputClass.isAnnotationPresent(XmlType.class));

    }

    @Override
    public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory bufferFactory, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
        return Flux.from(inputStream)
                .take(1)
                .concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints))
                .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);
    }

    @Override
    public List<MimeType> getEncodableMimeTypes() {
        return Arrays.asList( MimeTypeUtils.TEXT_XML );
    }



    private Flux<DataBuffer> encode(Object value ,
                                    DataBufferFactory bufferFactory,
                                    ResolvableType type,
                                    MimeType mimeType,
                                    Map<String, Object> hints){

        return Mono.fromCallable(() -> {
            boolean release = true;
            DataBuffer buffer = bufferFactory.allocateBuffer(1024);
            try {
                OutputStream outputStream = buffer.asOutputStream();
                Class<?> clazz = ClassUtils.getUserClass(value);
                Marshaller marshaller = initMarshaller(clazz);

                // here should be optimized
                DefaultStrategiesHelper helper = new DefaultStrategiesHelper(WebServiceTemplate.class);
                WebServiceMessageFactory messageFactory = helper.getDefaultStrategy(WebServiceMessageFactory.class);
                WebServiceMessage message = messageFactory.createWebServiceMessage();

                marshaller.marshal(value, message.getPayloadResult());
                message.writeTo(outputStream);

                release = false;
                return buffer;
            }
            catch (MarshalException ex) {
                throw new EncodingException(
                        "Could not marshal " + value.getClass() + " to XML", ex);
            }
            catch (JAXBException ex) {
                throw new CodecException("Invalid JAXB configuration", ex);
            }
            finally {
                if (release) {
                    DataBufferUtils.release(buffer);
                }
            }
        }).flux();
    }


    private Marshaller initMarshaller(Class<?> clazz) throws JAXBException {
        Marshaller marshaller = this.jaxbContexts.createMarshaller(clazz);
        marshaller.setProperty(Marshaller.JAXB_ENCODING, StandardCharsets.UTF_8.name());
        return marshaller;
    }
}
Run Code Online (Sandbox Code Playgroud)

网络客户端配置

@Bean
    public WebClient webClient(){
        TcpClient tcpClient = TcpClient.create();

        tcpClient
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .doOnConnected(connection -> {
                    connection.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS));
                    connection.addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS));
                });

        ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder().codecs( clientCodecConfigurer -> {
            clientCodecConfigurer.customCodecs().encoder(new Jaxb2SoapEncoder());
        }).build();

        WebClient webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient).wiretap(true)))
                .exchangeStrategies( exchangeStrategies )
                .build();

        return webClient;
    }
Run Code Online (Sandbox Code Playgroud)

网络客户端

public void call(GetCountryRequest getCountryRequest) throws SOAPException, ParserConfigurationException, IOException {

        webClient.post()
                .uri( soapServiceUrl )
                .contentType(MediaType.TEXT_XML)
                .body( Mono.just(getCountryRequest) , GetCountryRequest.class  )
                .retrieve()
                .onStatus(
                        HttpStatus::isError,
                        clientResponse ->
                                clientResponse
                                        .bodyToMono(String.class)
                                        .flatMap(
                                                errorResponseBody ->
                                                        Mono.error(
                                                                new ResponseStatusException(
                                                                        clientResponse.statusCode(),
                                                                        errorResponseBody))))

                .bodyToMono(GetCountryResponse.class)
                .doOnSuccess( (GetCountryResponse response) -> {
                    //handle success
                })
                .doOnError(ResponseStatusException.class, error -> {
                    //handle error
                })
                .subscribe();

    }
Run Code Online (Sandbox Code Playgroud)