1、对于MessageToMessageEncoder的理解

MessageToMessageEncoder编码器,这里的第二个Message可以理解为任意一个对象。如果是使用ByteBuf对象的话,就和之前的MessageToByte的原理是一样的了。所以要在MessageToMessageDecoder<ByteBuf>的解码器里面,手动的指定,是对ByteBuf类型的对象进行解码的操作。

2、编写MyStringEncoder编码器和MyStringDecoder解码器,以便于,Netty中可以直接发送和接收String类型的数据

2.1  MyStringEncoder编码器的代码

[java] view plaincopyprint?
  1. import io.netty.buffer.ByteBufUtil;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.handler.codec.MessageToMessageEncoder;
  4. import java.nio.CharBuffer;
  5. import java.nio.charset.Charset;
  6. import java.util.List;
  7. public class MyStringEncoder extends MessageToMessageEncoder<CharSequence> {
  8. private final Charset charset;
  9. public MyStringEncoder() {
  10. this(Charset.defaultCharset());
  11. }
  12. public MyStringEncoder(Charset charset) {
  13. if (charset == null) {
  14. throw new NullPointerException(“charset”);
  15. }
  16. this.charset = charset;
  17. }
  18. protected void encode(ChannelHandlerContext ctx, CharSequence msg,
  19. List<Object> out) throws Exception {
  20. if (msg.length() == 0) {
  21. return;
  22. }
  23. out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg),
  24. this.charset));
  25. }
  26. }

import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.List;public class MyStringEncoder extends MessageToMessageEncoder<CharSequence> {private final Charset charset;public MyStringEncoder() {this(Charset.defaultCharset());}public MyStringEncoder(Charset charset) {if (charset == null) {throw new NullPointerException("charset");}this.charset = charset;}protected void encode(ChannelHandlerContext ctx, CharSequence msg,List<Object> out) throws Exception {if (msg.length() == 0) {return;}out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg),this.charset));}
}

2.2  MyStringDecoder解码器的代码

[java] view plaincopyprint?
  1. import java.nio.charset.Charset;
  2. import java.util.List;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.handler.codec.MessageToMessageDecoder;
  6. public class MyStringDecoder extends MessageToMessageDecoder<ByteBuf> {
  7. private final Charset charset;
  8. public MyStringDecoder() {
  9. this(Charset.defaultCharset());
  10. }
  11. public MyStringDecoder(Charset charset) {
  12. if (charset == null) {
  13. throw new NullPointerException(“charset”);
  14. }
  15. this.charset = charset;
  16. }
  17. protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
  18. List<Object> out) throws Exception {
  19. out.add(msg.toString(this.charset));
  20. }
  21. }

import java.nio.charset.Charset;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;public class MyStringDecoder extends MessageToMessageDecoder<ByteBuf> {private final Charset charset;public MyStringDecoder() {this(Charset.defaultCharset());}public MyStringDecoder(Charset charset) {if (charset == null) {throw new NullPointerException("charset");}this.charset = charset;}protected void decode(ChannelHandlerContext ctx, ByteBuf msg,List<Object> out) throws Exception {out.add(msg.toString(this.charset));}
}

3、服务端的实现

[java] view plaincopyprint?
  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. public class Server {
  10. public void bind(int port) throws Exception {
  11. // 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接
  12. // 另一个线程组用于处理SocketChannel的网络读写
  13. EventLoopGroup bossGroup = new NioEventLoopGroup();
  14. EventLoopGroup workerGroup = new NioEventLoopGroup();
  15. try {
  16. // NIO服务器端的辅助启动类 降低服务器开发难度
  17. ServerBootstrap serverBootstrap = new ServerBootstrap();
  18. serverBootstrap.group(bossGroup, workerGroup)
  19. .channel(NioServerSocketChannel.class)// 类似NIO中serverSocketChannel
  20. .option(ChannelOption.SO_BACKLOG, 1024)// 配置TCP参数
  21. .option(ChannelOption.SO_BACKLOG, 1024) // 设置tcp缓冲区
  22. .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 设置发送缓冲大小
  23. .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 这是接收缓冲大小
  24. .option(ChannelOption.SO_KEEPALIVE, true) // 保持连接
  25. .childHandler(new ChildChannelHandler());// 最后绑定I/O事件的处理类
  26. // 处理网络IO事件
  27. // 服务器启动后 绑定监听端口 同步等待成功 主要用于异步操作的通知回调 回调处理用的ChildChannelHandler
  28. ChannelFuture f = serverBootstrap.bind(port).sync();
  29. System.out.println(”Server启动”);
  30. // 等待服务端监听端口关闭
  31. f.channel().closeFuture().sync();
  32. } finally {
  33. // 优雅退出 释放线程池资源
  34. bossGroup.shutdownGracefully();
  35. workerGroup.shutdownGracefully();
  36. System.out.println(”服务器优雅的释放了线程资源…”);
  37. }
  38. }
  39. /**
  40. * 网络事件处理器
  41. */
  42. private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  43. @Override
  44. protected void initChannel(SocketChannel ch) throws Exception {
  45. // 增加自定义的编码器和解码器
  46. ch.pipeline().addLast(new MyStringEncoder());
  47. ch.pipeline().addLast(new MyStringDecoder());
  48. // 服务端的处理器
  49. ch.pipeline().addLast(new ServerHandler());
  50. }
  51. }
  52. public static void main(String[] args) throws Exception {
  53. int port = 9998;
  54. new Server().bind(port);
  55. }
  56. }

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class Server {public void bind(int port) throws Exception {// 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接// 另一个线程组用于处理SocketChannel的网络读写EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {// NIO服务器端的辅助启动类 降低服务器开发难度ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)// 类似NIO中serverSocketChannel.option(ChannelOption.SO_BACKLOG, 1024)// 配置TCP参数.option(ChannelOption.SO_BACKLOG, 1024) // 设置tcp缓冲区.option(ChannelOption.SO_SNDBUF, 32 * 1024) // 设置发送缓冲大小.option(ChannelOption.SO_RCVBUF, 32 * 1024) // 这是接收缓冲大小.option(ChannelOption.SO_KEEPALIVE, true) // 保持连接.childHandler(new ChildChannelHandler());// 最后绑定I/O事件的处理类// 处理网络IO事件// 服务器启动后 绑定监听端口 同步等待成功 主要用于异步操作的通知回调 回调处理用的ChildChannelHandlerChannelFuture f = serverBootstrap.bind(port).sync();System.out.println("Server启动");// 等待服务端监听端口关闭f.channel().closeFuture().sync();} finally {// 优雅退出 释放线程池资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();System.out.println("服务器优雅的释放了线程资源...");}}/*** 网络事件处理器*/private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 增加自定义的编码器和解码器ch.pipeline().addLast(new MyStringEncoder());ch.pipeline().addLast(new MyStringDecoder());// 服务端的处理器ch.pipeline().addLast(new ServerHandler());}}public static void main(String[] args) throws Exception {int port = 9998;new Server().bind(port);}
}

4、服务端Handler的实现

[java] view plaincopyprint?
  1. import io.netty.channel.ChannelHandlerAdapter;
  2. import io.netty.channel.ChannelHandlerContext;
  3. public class ServerHandler extends ChannelHandlerAdapter {
  4. @Override
  5. public void channelRead(ChannelHandlerContext ctx, Object msg)
  6. throws Exception {
  7. // 接受客户端的数据
  8. String body = (String) msg;
  9. System.out.println(”Client :” + body);
  10. // 服务端,回写数据给客户端
  11. // 直接回写整形的数据
  12. String data = ”Hello ,I am Server …”;
  13. ctx.writeAndFlush(data);
  14. }
  15. @Override
  16. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
  17. throws Exception {
  18. ctx.close();
  19. }
  20. @Override
  21. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  22. ctx.flush();
  23. }
  24. }

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;public class ServerHandler extends ChannelHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {// 接受客户端的数据String body = (String) msg;System.out.println("Client :" + body);// 服务端,回写数据给客户端// 直接回写整形的数据String data = "Hello ,I am Server ...";ctx.writeAndFlush(data);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.close();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}
}

5、客户端的实现

[java] view plaincopyprint?
  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. public class Client {
  10. /**
  11. * 连接服务器
  12. *
  13. * @param port
  14. * @param host
  15. * @throws Exception
  16. */
  17. public void connect(int port, String host) throws Exception {
  18. // 配置客户端NIO线程组
  19. EventLoopGroup group = new NioEventLoopGroup();
  20. try {
  21. // 客户端辅助启动类 对客户端配置
  22. Bootstrap b = new Bootstrap();
  23. b.group(group)//
  24. .channel(NioSocketChannel.class)//
  25. .option(ChannelOption.TCP_NODELAY, true)//
  26. .handler(new MyChannelHandler());//
  27. // 异步链接服务器 同步等待链接成功
  28. ChannelFuture f = b.connect(host, port).sync();
  29. System.out.println(f);
  30. // 发送消息
  31. Thread.sleep(1000);
  32. f.channel().writeAndFlush(”777”);
  33. f.channel().writeAndFlush(”666”);
  34. Thread.sleep(2000);
  35. f.channel().writeAndFlush(”888”);
  36. // 等待链接关闭
  37. f.channel().closeFuture().sync();
  38. } finally {
  39. group.shutdownGracefully();
  40. System.out.println(”客户端优雅的释放了线程资源…”);
  41. }
  42. }
  43. /**
  44. * 网络事件处理器
  45. */
  46. private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
  47. @Override
  48. protected void initChannel(SocketChannel ch) throws Exception {
  49. // 增加自定义的编码器和解码器
  50. ch.pipeline().addLast(new MyStringEncoder());
  51. ch.pipeline().addLast(new MyStringDecoder());
  52. // 客户端的处理器
  53. ch.pipeline().addLast(new ClientHandler());
  54. }
  55. }
  56. public static void main(String[] args) throws Exception {
  57. new Client().connect(9998, “127.0.0.1”);
  58. }
  59. }

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class Client {/*** 连接服务器* * @param port* @param host* @throws Exception*/public void connect(int port, String host) throws Exception {// 配置客户端NIO线程组EventLoopGroup group = new NioEventLoopGroup();try {// 客户端辅助启动类 对客户端配置Bootstrap b = new Bootstrap();b.group(group)//.channel(NioSocketChannel.class)//.option(ChannelOption.TCP_NODELAY, true)//.handler(new MyChannelHandler());//// 异步链接服务器 同步等待链接成功ChannelFuture f = b.connect(host, port).sync();System.out.println(f);// 发送消息Thread.sleep(1000);f.channel().writeAndFlush("777");f.channel().writeAndFlush("666");Thread.sleep(2000);f.channel().writeAndFlush("888");// 等待链接关闭f.channel().closeFuture().sync();} finally {group.shutdownGracefully();System.out.println("客户端优雅的释放了线程资源...");}}/*** 网络事件处理器*/private class MyChannelHandler extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 增加自定义的编码器和解码器ch.pipeline().addLast(new MyStringEncoder());ch.pipeline().addLast(new MyStringDecoder());// 客户端的处理器ch.pipeline().addLast(new ClientHandler());}}public static void main(String[] args) throws Exception {new Client().connect(9998, "127.0.0.1");}
}

6、客户端Handler的实现

[java] view plaincopyprint?
  1. import io.netty.channel.ChannelHandlerAdapter;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.util.ReferenceCountUtil;
  4. public class ClientHandler extends ChannelHandlerAdapter {
  5. @Override
  6. public void channelRead(ChannelHandlerContext ctx, Object msg)
  7. throws Exception {
  8. try {
  9. String body = (String) msg;
  10. System.out.println(”Client :” + body);
  11. // 只是读数据,没有写数据的话
  12. // 需要自己手动的释放的消息
  13. } finally {
  14. ReferenceCountUtil.release(msg);
  15. }
  16. }
  17. @Override
  18. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
  19. throws Exception {
  20. ctx.close();
  21. }
  22. }

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;public class ClientHandler extends ChannelHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {try {String body = (String) msg;System.out.println("Client :" + body);// 只是读数据,没有写数据的话// 需要自己手动的释放的消息} finally {ReferenceCountUtil.release(msg);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.close();}}

7、关于直接发送和接收String类型的编码

7.1 直接发送String类型的数据

7.2 直接接收String类型的数据

Netty之自定义编码器MessageToMessageEncoder类相关推荐

  1. 【Netty】自定义解码器、编码器、编解码器(十五)

    文章目录 前言 一.自定义基于换行的解码器 1.1 LineBasedFrameDecoder 类 1.2 定义解码器 1.3 定义 ChannelHandler 1.4 定义 ChannelInit ...

  2. 07 接头暗语:如何利用 Netty 实现自定义协议通信?

    文章目录 07 接头暗语:如何利用 Netty 实现自定义协议通信? 通信协议设计 1. 魔数 2. 协议版本号 3. 序列化算法 4. 报文类型 5. 长度域字段 6. 请求数据 7. 状态 8. ...

  3. 07 | 接头暗语:如何利用 Netty 实现自定义协议通信?

    既然是网络编程,自然离不开通信协议,应用层之间通信需要实现各种各样的网络协议.在项目开发的过程中,我们就需要去构建满足自己业务场景的应用层协议.在上节课中我们介绍了如何使用网络协议解决 TCP 拆包/ ...

  4. Netty实现自定义简单的编解码器(二)

    2019独角兽企业重金招聘Python工程师标准>>> Netty实现自定义简单的编解码器(二) 关于编解码器请参见:http://my.oschina.net/xinxingege ...

  5. Netty实现自定义协议

    关于协议,使用最为广泛的是HTTP协议,但是在一些服务交互领域,其使用则相对较少,主要原因有三方面: HTTP协议会携带诸如header和cookie等信息,其本身对字节的利用率也较低,这使得HTTP ...

  6. python 多线程 类_Python中如何自定义一个多线程类呢?

    摘要: 下文讲述Python中自定义一个多线程类的方法分享,如下所示: 实现思路: 1.定义一个类继承threading.Thread 2.在自定义类中构造函数重写run方法 例: Python3中自 ...

  7. c++自定义的数据库类

    自定义的数据库类:Class DataBase 收藏  class DataBase { private: _ConnectionPtr m_pConnection; _CommandPtr m_pC ...

  8. 安卓开发37:自定义的HorizontalScrollView类,使其pageScroll的时候焦点不选中

    自定义一个HorizontalScrollView类,主要为了让这个HorizontalScrollView不能鼠标点击,不能左右按键,并且没有焦点. public class ImageMoveHo ...

  9. Spring Boot——自定义Web配置类后无法访问/static文件夹下静态资源

    问题描述 自定义Web配置类后无法访问 /static文件夹下静态资源. 已加相关依赖包. 官方文档 Spring MVC Auto Configuration Maven <dependenc ...

最新文章

  1. python3:利用SMTP协议发送QQ邮件+附件
  2. 红包规则_“科普闯关100%夺红包”游戏规则升级了!速速来看!
  3. SparkSql官方文档中文翻译(java版本)
  4. 巧妙设备MTU的大小,轻松提网速
  5. 再论EM算法的收敛性和K-Means的收敛性
  6. 网络爬虫--12.【XPath实战】获取百度贴吧中的图片
  7. Centos7 开机启动汇总
  8. 4.9.5 通用注释
  9. 单片机汉字点阵c语言程序,51单片机C语言多种点阵屏驱动程序(开发软件为keil C...
  10. Linux进程管理四大‘名捕’
  11. 创客使用Fusion 360 - 认识Fusion 360
  12. com.thoughtworks.xstream.security.ForbiddenClassException com.thoughtworks.xstream 1.4.18
  13. JS提升:Promise中reject与then之间的关系
  14. 【HISI系列】之HISI芯片码率控制使用说明
  15. cocos creator 3D | 拇指投篮 | 3D项目入门实战
  16. 数据挖掘综合应用:数据预处理代码实战
  17. 打造高速浏览器,逐鹿搜索市场,搜狗高速浏览器2.0值得期待
  18. [开源 .NET 跨平台 数据采集 爬虫框架: DotnetSpider] [一] 初衷与架构设计 - ModestMT.Zou - 博客园...
  19. 老板太会做生意,只做了一件事,就让这家刚开业的餐厅人气暴涨
  20. 论文笔记:Geo-Neus: Geometry-Consistent Neural Implicit Surfaces Learning for Multi-view Reconstruction

热门文章

  1. 逆向工程核心原理学习笔记(十):IA-32寄存器基本讲解
  2. extern用法详解
  3. Python UDP聊天器
  4. 单元测试之JUnit 5 参数化测试使用手册
  5. 张霖峰:AV1和VVC的格局将在2023年后明朗
  6. Facebook批量优化360照片
  7. 数据结构与算法之猫狗队列
  8. 一文掌握开发利器:正则表达式
  9. python 知识点总结
  10. 面试官:看你简历写了熟悉Kafka,它为什么速度会这么快?