Scala Netty如何为基于字节数据的协议创建一个简单的客户端?

aut*_*nix 2 scala netty

本课题旨在介绍我在处理当前项目时遇到的问题.我将在下面回答我的解决方案.

我正在开发一个项目,要求我连接到一个数据馈送服务器,该服务器具有传输数据的专有协议,基本上以GZIP格式在TCP协议的数据部分编码,需要提取.

来自数据提供程序的数据协议的示例应用程序使用Java中的简单套接字.我想将它改编为scala/netty.另外,值得注意的是,所提供的数据可以分布在多个分组上.

我一直在寻找关于如何使用Netty.io来创建一个简单的客户端应用程序的简单而简洁的示例,但所有示例看起来都过于复杂,并且缺乏足够的解释来简单地实现此目的.更重要的是,很多netty/scala示例都面向服务器应用程序.

" 入门 "网络教程也缺乏足够的解释,以便在实际入门时轻松导航.

问题是,如何实现连接到服务器的简单netty应用程序,接收数据并解析结果?

以下是我为了尝试理解这个概念而考虑的一些例子:

aut*_*nix 7

在尝试使用套接字将java应用程序复制到使用Netty的更复杂方法时,我遇到了这个问题.

我解决问题的方法是了解建立连接所需的netty库的各种元素:

这3个元素确保创建和管理连接以进行进一步处理.

此外,使用Netty时还需要一些其他元素:

信道初始化器负责准备管道,其基本上通过一系列"过滤器"传递入站和出站数据,以便处理不同级别的数据,每个级别接收由先前编码器/解码器处理的数据.

以下是netty文档中提供的管道工作方式:



                                                    I/O Request
                                                via Channel or
                                            ChannelHandlerContext
                                                          |
      +---------------------------------------------------+---------------+
      |                           ChannelPipeline         |               |
      |                                                  \|/              |
      |    +---------------------+            +-----------+----------+    |
      |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      |               |                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  .               |
      |               .                                   .               |
      | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
      |        [ method call]                       [method call]         |
      |               .                                   .               |
      |               .                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      |               |                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      +---------------+-----------------------------------+---------------+
                      |                                  \|/
      +---------------+-----------------------------------+---------------+
      |               |                                   |               |
      |       [ Socket.read() ]                    [ Socket.write() ]     |
      |                                                                   |
      |  Netty Internal I/O Threads (Transport Implementation)            |
      +-------------------------------------------------------------------+

在问题的原始上下文的情况下,没有预设解码器允许解析具有预定字节的自定义数据.实质上,这意味着必须创建入站数据的自定义解码器.

让我们首先回顾一下作为客户端应用程序启动的连接的基础知识:


    import io.netty.bootstrap.Bootstrap
    import io.netty.channel.nio.NioEventLoopGroup
    import io.netty.channel.socket.nio.NioSocketChannel
    import io.netty.channel.socket.SocketChannel

    object App {
      def main(args: Array[String]){
        connect()
      }

      def connect() { 
        val host = "host.example.com"
        val port = 9999 
        val group = new NioEventLoopGroup() // starts the event loop group

        try {
          var b = new Bootstrap() // creates the netty bootstrap 
            .group(group) // associates the NioEventLoopGroup to the bootstrap
            .channel(classOf[NioSocketChannel]) // associates the channel to the bootstrap
            .handler(MyChannelInitializer) // provides the handler for dealing with the incoming/outgoing data on the channel


          var ch = b.connect(host, port).sync().channel() //initiates the connection to the server and links it to the netty channel

          ch.writeAndFlush("SERVICE_REQUEST") // sends the request to the server

          ch.closeFuture().sync() // keeps the connection alive instead of shutting down the channel after receiving the first packet
        }
        catch {
          case t: Throwable => t.printStackTrace(); group.shutdownGracefully() 
        }
        finally {
          group.shutdownGracefully() // Shutdown the event group
        }
      }
    }

MyChannelInitializer 在启动引导程序时调用的部分将负责告诉程序如何处理传入和传出的数据消息:


    import io.netty.channel.ChannelInitializer
    import io.netty.channel.socket.SocketChannel
    import io.netty.handler.codec.string.StringEncoder

    object MyChannelInitializer extends ChannelInitializer[SocketChannel] {

      val STR_ENCODER = new StringEncoder // Generic StringEecoder from netty to simply allow a string to be prepared and sent out to the server

      def initChannel(ch: SocketChannel) {
        val pipeline = ch.pipeline() // loads the pipeline associated with the channel

        // Decode Message
        pipeline.addLast("packet-decoder",MyPacketDecoder) // first data "filter" to extract the necessary bytes for the second filter
        pipeline.addLast("gzip-inflater", MyGZipDecoder) // second "filter" to unzip the contents

        // Encode String to send
        pipeline.addLast("command-encoder",STR_ENCODER) // String encoder for outgoing data

        // Handler
        pipeline.addLast("message-handler", MyMessageHandler) // Handles the end data after all "filters" have been applied
      }
    }

在这个实例中,第一个管道项MyPacketDecoder已经被创建为ReplayingDecoder的子类,它提供了一种简单的方法来执行数据包重建,以便为消息提供所有必需的字节.(简单地说,等待所有字节在ByteBuf变量中收集,然后再继续)

了解ByteBuf如何工作对于这种类型的应用程序非常重要,尤其是read和get方法之间的区别,它们分别允许读取和移动读取索引或简单地读取数据而不影响读取器索引.

MyPacketDecoder下面提供了一个例子


    import io.netty.handler.codec.ReplayingDecoder
    import io.netty.channel.ChannelHandlerContext
    import io.netty.buffer.ByteBuf
    import java.util.List

    object MyPacketDecoder extends ReplayingDecoder[Int] {

      val READ_HEADER = 0
      val READ_CONTENT = 1

      super.state(READ_HEADER) // sets the initial state of the Decoder by calling the superclass constructor

      var blockSize:Int = 0 // size of the data expected, published by the received data from the server, will vary according to your case, there may be additional header bytes before the actual data to be processed    

      def decode(ctx: ChannelHandlerContext,in: ByteBuf,out: List[AnyRef]): Unit = {

        var received_size = in.readableBytes()

        if(state() == READ_HEADER){
          blockSize = in.readInt() // header data with the size of the expected data to be received in the current and following packets if segmented

          checkpoint(READ_CONTENT) // change the state of the object in order to proceed to obtaining all the required bytes necessary for the message to be valid
        }
        else if(state() == READ_CONTENT){

          var bytes = new Array[Byte](blockSize)
          in.getBytes(0,bytes,0,blockSize) // adds collected bytes to the by array for the expected size as defined by the blockSize variable

          var frame = in.readBytes(blockSize) // creates the bytebuf to be passed to the next "filter"

          checkpoint(READ_HEADER) // changes the state preparing for the next message
          out.add(frame) // passes the data to the next "filter"
        }
        else {
          throw new Error("Case not covered Exception")
        }  
      }

    }

前面的代码将所有数据包的接收字节从预期的字节大小中取出,并将其传递给后续的管道级别.

下一个管道级别处理接收数据的GZIP解压缩.这由MyGZipDecoder对象确保,该对象被定义为ByteToMessageDecoder抽象对象的子类,以便将Byte信息作为接收数据处理:


    import io.netty.handler.codec.ByteToMessageDecoder
    import io.netty.channel.ChannelHandlerContext
    import io.netty.buffer.ByteBuf
    import java.net._
    import java.io._
    import java.util._
    import java.util.zip._
    import java.text._

    object MyGZipDecoder extends ByteToMessageDecoder {

      val MAX_DATA_SIZE = 100000

      var inflater = new Inflater(true)
      var compressedData = new Array[Byte](MAX_DATA_SIZE)
      var uncompressedData = new Array[Byte](MAX_DATA_SIZE)

      def decode(ctx: ChannelHandlerContext,in: ByteBuf,out: List[AnyRef]): Unit = {

        var received_size = in.readableBytes() // reads the number of available bytes

        in.readBytes(compressedData, 0, received_size) // puts the bytes into a Byte array

        inflater.reset();
        inflater.setInput(compressedData, 0, received_size) // prepares the inflater for decompression of the data
        var resultLength = inflater.inflate(uncompressedData) // decompresses the data into the uncompressedData Byte array

        var message = new String(uncompressedData) // generates a string from the uncompressed data

        out.add(message) // passes the data to the next pipeline level
      }
    }

该解码器解压缩在分组中接收的压缩数据,并将数据作为从在该级别接收的解码字节获得的字符串发送到下一级别.

最后一个难题是MyMessageHandler基本上对数据进行最终处理以满足应用程序所需目的的对象.这是SimpleChannelInboundHandler的子类,其String参数需要作为通道数据的消息:


    import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
    import io.netty.channel.ChannelHandler.Sharable

    @Sharable
    object QMMessageHandler extends SimpleChannelInboundHandler[String] {

      def channelRead0(ctx: ChannelHandlerContext, msg: String) { 

        println("Handler => Received message: "+msg) 
        // Do your data processing here however you need for the application purposes

      }
    }

这基本上完成了连接到服务器的这个特定示例的要求,该服务器使用对基础分组数据的GZip压缩来提供专有数据协议中的数据.

希望这可以作为尝试实现类似场景的人的良好基础,但主要思想是需要一些自定义来为专有协议创建适应性处理.

此外,重要的是要注意这种类型的实现并非真正用于简单的客户端 - 服务器连接,而是用于需要netty库提供的数据的可分发性/可伸缩性的应用程序(即同时进行多个并发连接)并广播数据).

对于在写这个答案时我可能错过的任何错误,我提前道歉.

我希望这个简短的教程可以帮助其他人,因为我个人不得不花费一些令人沮丧的时间从网上的点点滴滴中找出它.