在Netty的实践中很多场景都会用到自定义报文,那么MessageToMessageEncoder编码器,几乎比如会遇到。

从类名层面来说,第二个Message可以理解为任意一个对象。如果是使用ByteBuf对象的话,就和MessageToByte的原理一样。

使用时,要在MessageToMessageDecoder\<ByteBuf>的解码器里面对ByteBuf类型的对象进行解码操作,解析为所需的格式或报文。

本文以字符串的编码和解析为实例(关注公众号:程序新视界,回复“1006”,接收完整实例和更多代码)来进行讲解,通过自定义编码器和解码器,便可以在Netty中便直接发送和接收String类型的数据。

解码器的本质就是将原始字符数据与自定义的消息对象进行转换。网络中数据的传输都是以字节码形式的,client编码发送到server,server进行解码,反之一样。因此,编码器和解码器往往是成对出现的。

了解了上述基本原理,下面来看看具体实现代码。

首先看编码器部分StringEncoder:

/**
 * 自定义字符串编码器
 * 来源:公众号,程序新视界
 * @author sec
 * @version 1.0
 * @date 2020/12/22
 **/
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {

    private final Charset charset;

    public StringEncoder() {
        this(Charset.defaultCharset());
    }

    public StringEncoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) {
        // 发送消息为空直接返回
        if (msg.length() == 0) {
            return;
        }
        out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg),
                this.charset));
    }
}

对应的解码器StringDecoder:

/**
 * 字符串解码器
 * 来源:公众号,程序新视界
 * @author sec
 * @version 1.0
 * @date 2020/12/22
 **/
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {

    private final Charset charset;

    public StringDecoder() {
        this(Charset.defaultCharset());
    }

    public StringDecoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
                          List<Object> out) {
        out.add(msg.toString(this.charset));
    }
}

其中,编码器和解码器都需要在client和server中进行配置指定,先看server端代码:

/**
 * 服务器端
 *
 * 来源:公众号,程序新视界
 * @author sec
 * @version 1.0
 * @date 2020/12/22
 **/
public class Server {

    public static void main(String[] args) throws Exception {
        int port = 9998;
        new Server().bind(port);
    }

    public void bind(int port) throws Exception {
        // 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接
        // 另一个线程组用于处理SocketChannel的网络读写
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // NIO服务器端的辅助启动类 降低服务器开发难度
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    // 类似NIO中serverSocketChannel
                    .channel(NioServerSocketChannel.class)
                    // 配置TCP参数
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 设置tcp缓冲区
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 设置发送缓冲大小
                    .option(ChannelOption.SO_SNDBUF, 32 * 1024)
                    // 这是接收缓冲大小
                    .option(ChannelOption.SO_RCVBUF, 32 * 1024)
                    // 保持连接
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        // 网络事件处理器
                        @Override
                        protected void initChannel(SocketChannel channel) {
                            // 增加自定义的编码器和解码器
                            channel.pipeline()
                                    .addLast(new StringEncoder())
                                    .addLast(new StringDecoder())
                                    // 服务端的处理器
                                    .addLast(new ServerHandler());
                        }
                    });

            // 服务器启动后 绑定监听端口 同步等待成功 主要用于异步操作的通知回调 回调处理用的ChildChannelHandler
            ChannelFuture f = serverBootstrap.bind(port).sync();
            System.out.println("Server启动");
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出 释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            System.out.println("服务器优雅的释放了线程资源…");
        }
    }
}

然后对应Handler:

/**
 * 服务器端处理器
 *
 * 来源:公众号,程序新视界
 * @author sec
 * @version 1.0
 * @date 2020/12/22
 **/
public class ServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 接受客户端的数据
        String body = (String) msg;
        System.out.println(body.length());
        System.out.println("收到Client信息:[" + body +"]");
        // 服务端,回写数据给客户端,直接回写整形的数据
        String data = "Hello ,I am Server …";
        ctx.writeAndFlush(data);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
}

client端对应请求:

/**
 * 客户端
 *
 * 来源:公众号,程序新视界
 * @author sec
 * @version 1.0
 * @date 2020/12/22
 **/
public class Client {

    public static void main(String[] args) throws Exception {
        new Client().connect(9998, "127.0.0.1");
    }

    /**
     * 连接服务器
     */
    public void connect(int port, String host) throws Exception {
        // 配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 客户端辅助启动类 对客户端配置
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        // 网络事件处理器
                        @Override
                        protected void initChannel(SocketChannel channel) {
                            // 增加自定义的编码器和解码器
                            channel.pipeline()
                                    .addLast(new StringEncoder())
                                    .addLast(new StringDecoder())
                                    // 客户端的处理器
                                    .addLast(new ClientHandler());
                        }
                    });
            // 异步链接服务器 同步等待链接成功
            ChannelFuture f = b.connect(host, port).sync();
            System.out.println(f);
            // 发送消息
            Thread.sleep(1000);
            f.channel().writeAndFlush("Hello ");
            f.channel().writeAndFlush("World ");
            Thread.sleep(2000);
            f.channel().writeAndFlush("Netty ");
            // 等待链接关闭
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
            System.out.println("客户端优雅的释放了线程资源...");
        }
    }
}

client对应的handler:

/**
 * 客户端处理器
 *
 * 来源:公众号,程序新视界
 * @author sec
 * @version 1.0
 * @date 2020/12/22
 **/
public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            String body = (String) msg;
            System.out.println("Client :" + body);

            // 只是读数据,没有写数据的话
            // 需要自己手动的释放的消息
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

}

然后依次运行Server和Client即可进行通信。



Netty自定义编码器MessageToMessageEncoder类插图

关注公众号:程序新视界,一个让你软实力、硬技术同步提升的平台

除非注明,否则均为程序新视界原创文章,转载必须以链接形式标明本文链接

本文链接:https://choupangxia.com/2020/12/23/netty-messagetomessageencoder/