Netty自定义编码器MessageToMessageEncoder类
在Netty的实践中很多场景都会用到自定义报文,那么MessageToMessageEncoder编码器,几乎比如会遇到。
从类名层面来说,第二个Message可以理解为任意一个对象。如果是使用ByteBuf对象的话,就和MessageToByte的原理一样。
使用时,要在MessageToMessageDecoder\<ByteBuf>的解码器里面对ByteBuf类型的对象进行解码操作,解析为所需的格式或报文。
本文以字符串的编码和解析为实例(关注公众号:程序新视界,回复“1006”,接收完整实例和更多代码)来进行讲解,通过自定义编码器和解码器,便可以在Netty中便直接发送和接收String类型的数据。
解码器的本质就是将原始字符数据与自定义的消息对象进行转换。网络中数据的传输都是以字节码形式的,client编码发送到server,server进行解码,反之一样。因此,编码器和解码器往往是成对出现的。
了解了上述基本原理,下面来看看具体实现代码。
首先看编码器部分StringEncoder:
/** * 自定义字符串编码器 * 来源:公众号,程序新视界 * @author sec * @version 1.0 * @date 2020/12/22 **/ public class StringEncoder extends MessageToMessageEncoder<CharSequence> { private final Charset charset; public StringEncoder() { this(Charset.defaultCharset()); } public StringEncoder(Charset charset) { if (charset == null) { throw new NullPointerException("charset"); } this.charset = charset; } @Override protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) { // 发送消息为空直接返回 if (msg.length() == 0) { return; } out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), this.charset)); } }
对应的解码器StringDecoder:
/** * 字符串解码器 * 来源:公众号,程序新视界 * @author sec * @version 1.0 * @date 2020/12/22 **/ public class StringDecoder extends MessageToMessageDecoder<ByteBuf> { private final Charset charset; public StringDecoder() { this(Charset.defaultCharset()); } public StringDecoder(Charset charset) { if (charset == null) { throw new NullPointerException("charset"); } this.charset = charset; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) { out.add(msg.toString(this.charset)); } }
其中,编码器和解码器都需要在client和server中进行配置指定,先看server端代码:
/** * 服务器端 * * 来源:公众号,程序新视界 * @author sec * @version 1.0 * @date 2020/12/22 **/ public class Server { public static void main(String[] args) throws Exception { int port = 9998; new Server().bind(port); } public void bind(int port) throws Exception { // 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接 // 另一个线程组用于处理SocketChannel的网络读写 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // NIO服务器端的辅助启动类 降低服务器开发难度 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) // 类似NIO中serverSocketChannel .channel(NioServerSocketChannel.class) // 配置TCP参数 .option(ChannelOption.SO_BACKLOG, 1024) // 设置tcp缓冲区 .option(ChannelOption.SO_BACKLOG, 1024) // 设置发送缓冲大小 .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 这是接收缓冲大小 .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 保持连接 .option(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { // 网络事件处理器 @Override protected void initChannel(SocketChannel channel) { // 增加自定义的编码器和解码器 channel.pipeline() .addLast(new StringEncoder()) .addLast(new StringDecoder()) // 服务端的处理器 .addLast(new ServerHandler()); } }); // 服务器启动后 绑定监听端口 同步等待成功 主要用于异步操作的通知回调 回调处理用的ChildChannelHandler ChannelFuture f = serverBootstrap.bind(port).sync(); System.out.println("Server启动"); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出 释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); System.out.println("服务器优雅的释放了线程资源…"); } } }
然后对应Handler:
/** * 服务器端处理器 * * 来源:公众号,程序新视界 * @author sec * @version 1.0 * @date 2020/12/22 **/ public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 接受客户端的数据 String body = (String) msg; System.out.println(body.length()); System.out.println("收到Client信息:[" + body +"]"); // 服务端,回写数据给客户端,直接回写整形的数据 String data = "Hello ,I am Server …"; ctx.writeAndFlush(data); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } }
client端对应请求:
/** * 客户端 * * 来源:公众号,程序新视界 * @author sec * @version 1.0 * @date 2020/12/22 **/ public class Client { public static void main(String[] args) throws Exception { new Client().connect(9998, "127.0.0.1"); } /** * 连接服务器 */ public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { // 客户端辅助启动类 对客户端配置 Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { // 网络事件处理器 @Override protected void initChannel(SocketChannel channel) { // 增加自定义的编码器和解码器 channel.pipeline() .addLast(new StringEncoder()) .addLast(new StringDecoder()) // 客户端的处理器 .addLast(new ClientHandler()); } }); // 异步链接服务器 同步等待链接成功 ChannelFuture f = b.connect(host, port).sync(); System.out.println(f); // 发送消息 Thread.sleep(1000); f.channel().writeAndFlush("Hello "); f.channel().writeAndFlush("World "); Thread.sleep(2000); f.channel().writeAndFlush("Netty "); // 等待链接关闭 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); System.out.println("客户端优雅的释放了线程资源..."); } } }
client对应的handler:
/** * 客户端处理器 * * 来源:公众号,程序新视界 * @author sec * @version 1.0 * @date 2020/12/22 **/ public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { String body = (String) msg; System.out.println("Client :" + body); // 只是读数据,没有写数据的话 // 需要自己手动的释放的消息 } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
然后依次运行Server和Client即可进行通信。
关注公众号:程序新视界,一个让你软实力、硬技术同步提升的平台
除非注明,否则均为程序新视界原创文章,转载必须以链接形式标明本文链接
本文链接:https://choupangxia.com/2020/12/23/netty-messagetomessageencoder/