文章目录

  • 前言
  • 服务端
  • 启动方式

前言

最近在做的项目有一个需要对接TCP的功能,网上查了一下,决定用netty来实现。

服务端

这次的需求只需要做一个服务端,话不多说,直接上代码

pom

        <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.18.Final</version></dependency>

Netty.java

public class Netty {private static Logger log = LoggerFactory.getLogger(Netty.class);// 请求队列长度private static Integer TcpWaitSize = 1024;// 运行系统private static String system = "linux";private static NettyServerInit nettyServerChannelInitializer = new NettyServerInit();private ConcurrentHashMap<Integer, ChannelFuture> futures = new ConcurrentHashMap<Integer, ChannelFuture>();private ConcurrentHashMap<Integer, List<EventLoopGroup>> eventLoopGroups = new ConcurrentHashMap<Integer, List<EventLoopGroup>>();private ServerBootstrap serverBootstrap = new ServerBootstrap();private EventLoopGroup bossGroup = null;private EventLoopGroup workerGroup = null;private LinkedList<Integer> ports = new LinkedList<>();public void addPort(Integer port) {ports.add(port);}/*** 停止监听指定端口*/public int stop(int port) {if (futures.containsKey(port)) {ChannelFuture future = futures.get(port);future.channel().closeFuture();futures.remove(port);List<EventLoopGroup> list = this.eventLoopGroups.get(port);for (int i = 0; i < list.size(); i++) {list.get(i).shutdownGracefully();}eventLoopGroups.remove(port);log.info(">>>>>>>>>> 关闭对端口<{}>的监听", port);}return port;}/*** 停止监听所有端口*/public void stopAll() {ports.removeIf(s -> s == stop(s));}/*** 启动监听** @param ports 端口通过‘,’拼接*/public void start(String ports) {log.info(">>>>>>>>>>>>>>> netty server 启动中......");String[] temp = ports.split(",");try {for (int i = 0; i < temp.length; i++) {int port = Integer.parseInt(temp[i]);start(port);}} catch (Exception e) {log.error(">>>>>>>>>> netty server 启动异常{}", e);} finally {}log.info(">>>>>>>>>>>>>>> netty server 启动成功");}/*** 监听某一端口*/public void start(int port) {List<EventLoopGroup> list = baseData();ChannelFuture future = serverBootstrap.bind(port);// 关联端口和通道futures.put(port, future);// 关联端口和工作组eventLoopGroups.put(port, list);future.addListener(f -> {if (f.isSuccess()) {log.info(">>>>>>>>>> netty server 监听端口<{}>成功", port);} else {log.info(">>>>>>>>>> netty server 监听端口<{}>失败", port);}});addPort(port);}public List<EventLoopGroup> baseData() {//存放NioServerSocketChannel.class 或者 EpollServerSocketChannel.classList<Object> serverSocketChannelList = new ArrayList();List<EventLoopGroup> eventLoopGroups;//Group:群组,Loop:循环,Event:事件//Netty内部都是通过线程在处理各种数据,EventLoopGroup就是用来管理调度他们的,注册Channel,管理他们的生命周期。//NioEventLoopGroup是一个处理I/O操作的多线程事件循环String name = System.getProperties().getProperty("os.name").toLowerCase();if (name.contains(system)) {//如果程序在linux上运行,可以使用EpollEventLoopGroup,从而获得更好的性能、更少的GC和更高级的特性,而这些特性只在linux上可用bossGroup = new EpollEventLoopGroup();workerGroup = new EpollEventLoopGroup();serverSocketChannelList.add(EpollServerSocketChannel.class);eventLoopGroups = Stream.of(bossGroup, workerGroup).collect(Collectors.toList());log.info(">>>>>>>>>> netty server 使用epoll模式(仅linux系统使用)");} else {bossGroup = new NioEventLoopGroup();workerGroup = new NioEventLoopGroup();serverSocketChannelList.add(NioServerSocketChannel.class);eventLoopGroups = Stream.of(bossGroup, workerGroup).collect(Collectors.toList());log.info(">>>>>>>>>> netty server 使用nio模式");}try {ServerBootstrap sbs = new ServerBootstrap();this.serverBootstrap = sbs.group(bossGroup, workerGroup)  //绑定线程池.channel((Class<? extends ServerChannel>) serverSocketChannelList.get(0))  // 指定使用的channel.option(ChannelOption.SO_BACKLOG, TcpWaitSize) //当处理线程都忙碌时候,临时存放已完成三次握手的请求的队列的最大长度.childOption(ChannelOption.SO_KEEPALIVE, true)  //保持长连接.childOption(ChannelOption.TCP_NODELAY, true) //true防止数据传输延迟 如果false的话会缓冲数据达到一定量在flush,降低系统网络调用(具体场景)//FixedRecvByteBufAllocator:固定长度的接收缓冲区分配器,由它分配的ByteBuf长度都是固定大小的,并不会根据实际数据报的大小动态收缩。但是,如果容量不足,支持动态扩展。动态扩展是Netty ByteBuf的一项基本功能,与ByteBuf分配器的实现没有关系;//AdaptiveRecvByteBufAllocator:容量动态调整的接收缓冲区分配器,它会根据之前Channel接收到的数据报大小进行计算,如果连续填充满接收缓冲区的可写空间,则动态扩展容量。如果连续2次接收到的数据报都小于指定值,则收缩当前的容量,以节约内存。.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 1024, 65535)).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //使用内存池.childHandler(nettyServerChannelInitializer);  //channel初始化} catch (Exception e) {e.printStackTrace();}return eventLoopGroups;}
}

NettyChannelUtil.java

public class NettyChannelUtil {//活跃通道列表  channelId:channelpublic static ConcurrentHashMap<String, Channel> sessionChannelMap = new ConcurrentHashMap<String, Channel>();//桩编号和通道id映射  pileCode:channelIdpublic static ConcurrentHashMap<String, String> map = new ConcurrentHashMap<String, String>();//保存连接时间   channelId:timepublic static ConcurrentHashMap<String, String> mapTime = new ConcurrentHashMap<String, String>();/*** 获取所有连接*/public static Map<String, Channel> channelAll() {return sessionChannelMap;}/*** 获取所有连接的时间*/public static Map<String, String> channelConnectTimeAll() {return mapTime;}/*** 获取所有映射的桩编号*/public static Map<String, String> channelPileCodeAll() {return map;}/*** 获取连接时间*/public static String getTime(String channelId) {return mapTime.get(channelId);}/*** 获取连接对应的桩编号*/public static String getPileCode(String channelId) {return map.get(channelId);}/*** 获取连接数*/public static Integer getConnectNumber() {return sessionChannelMap.size();}/*** 添加连接*/public static boolean addConnect(Channel channel) {String channelId = channel.id().asLongText();//作为主动请求的依据,若是重连会覆盖channelAll().put(channelId, channel);channelConnectTimeAll().put(channelId, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));return true;}/*** 移除连接*/public static boolean deleteConnect(Channel channel) {String channelId = channel.id().asLongText();channelAll().remove(channelId);channelConnectTimeAll().remove(channelId);channelPileCodeAll().remove(channelId);return true;}/*** @return java.util.List<java.lang.String>* @Author xiongchuan* @Description 根据value获取值* @Date 2020/5/7 23:44* @Param [value]**/public static List<String> getByValue(String value) {List<String> list = new ArrayList<>();for (String key : map.keySet()) {if (map.get(key).equals(value)) {list.add(key);}}return list;}}

NettyHandler.java

@ChannelHandler.Sharable
public class NettyHandler extends ChannelInboundHandlerAdapter {private static Logger log = LoggerFactory.getLogger(NettyHandler.class);/*** channelAction* channel 通道 action 活跃的* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) {log.info("================通道活跃中....========================hashcode值:{}", this.hashCode());try {NettyChannelUtil.addConnect(ctx.channel());} catch (Exception e) {log.info("未知错误!{}", e);}}/*** channelInactive* channel 通道 Inactive 不活跃的* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据** @param ctx*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.warn("--------Netty Disconnect Client IP is :{}  {} --------", ctx.channel().id().asShortText(), ctx.channel().remoteAddress());removeChannel(ctx.channel());ctx.close();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {log.info("================Netty读取信息已经完成!========================");ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("--------Netty Exception ExceptionCaught :{}  {} =======================\n", ctx.channel().id().asShortText(), cause.getMessage());cause.printStackTrace();ctx.close();}/*** 检测指定时间内无读写操作时触发,此设置在pipeline链路中设置*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {//判断事件是不是IdleStateEvent事件,然后再判断是否为读空闲or写空闲or读写空闲,是就做相应处理,不是就把事件透传给下一个处理类if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {ctx.close();log.info("================服务器检测到未在指定时间内接收到客户端【{}】的数据,关闭此通道!\r\n", ctx.channel().id().asShortText());}} else {super.userEventTriggered(ctx, evt);}}/*** 移除通道** @param channel*/public void removeChannel(Channel channel) {try {//去除不活跃的通道映射NettyChannelUtil.deleteConnect(channel);log.info("--------当前活跃通道总数:{}", NettyChannelUtil.getConnectNumber());} catch (Exception e) {log.info("未知错误!error:{}", e);} finally {}}/*** 功能:读取服务器发送过来的信息** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {log.info("================通道读取服务器发送过来的信息========================");try {//String data = (String) msg;log.info("原始报文:{}", msg);byte req[]=(byte[]) msg;String message=ConversionUtil.bytesToHexString(req);String clientId = ctx.channel().id().asLongText();/***  使用不同于nioEventLoopGroup的线程池去处理耗时操作,避免阻塞nioEventLoopGroup线程池*/ThreadUtil.execAsync(() -> {handleMessageStart(ctx, req,message, clientId);});} catch (Exception e) {log.info("================通道读取消息异常:{}========================", e.getMessage());} finally {ReferenceCountUtil.release(msg);}}//处理业务逻辑之前的判断private void handleMessageStart(ChannelHandlerContext ctx, byte[] req,String data, String clientId) {//do something}}

注:这里是因为是十六进制的byte数组所以这么转换,应用的时候需要根据实际需求来
NettyServerInit.java

public class NettyServerInit extends ChannelInitializer<SocketChannel> {private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerInit.class);private static final NettyHandler serverHandler = new NettyHandler();// redis设置过期时间public static Integer redisTimeout = 100;// 连接断开时间public static Integer readerIdleTime = 300;private static Integer writerIdleTime = 0;private static Integer allIdleTime = 0;// 解决粘包分隔符public static String delimiter = "_$";@Overrideprotected void initChannel(SocketChannel channel) {InetSocketAddress socketAdd = channel.remoteAddress();LOGGER.info("================检测到socket客户端链接到本服务器, IP为:" + socketAdd.getAddress().getHostAddress() + ", Port为:" + socketAdd.getPort() + "  hashCode:" + this.hashCode() + "========================");ChannelPipeline pipeline = channel.pipeline();//心跳设置pipeline.addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS));//编码pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));//解码//pipeline.addLast(new DelimiterBasedFrameDecoder(1024*1024, Unpooled.wrappedBuffer(delimiter.getBytes())));pipeline.addLast(new NettyDecoder());//添加处理类pipeline.addLast(serverHandler);}
}

注:NettyDecoder是自己实现的解码校验类,继承自ByteToMessageDecoder就可以

启动方式

        NETTY=new Netty();NETTY.start(8080);

使用netty实现简单tcp服务端相关推荐

  1. Netty实例-简单的服务端-client实现,凝视具体

           书籍推荐:                                       实例代码 : http://download.csdn.net/detail/jiangtao_s ...

  2. netty获取玩家chanel_基于netty的TCP服务端如何给客户端发送消息,但是如何拿到客户端连接时的SocketChannel呢,菜鸟求助?...

    1.思路1 每个客户端连接时的SocketChannel保存在会话类sessionManager中的sessionIdMap中 问题: 1.客户端连接时确实将SocketChannel保存在会话类se ...

  3. 采用netty开发智能手表tcp服务端还是非常不错的

    采用netty开发智能手表tcp服务端还是非常不错的,经过单服务部署测试并发能达到10w,可以用于开发开发马蹄锁,儿童智能手表,其他智能设备,物联网等等,有啥有趣好玩的物联网可以进行交流一下

  4. 使用Netty实现客户端和服务端之间的双向通信

    欢迎阅读本篇文章 提示:本文只是提供部分核心代码,源码详见代码示例 使用Netty实现客户端和服务端之间的双向通信 前言 一.服务端 二.客户端 前言 在上个月的开发计划中,有一个系统控制喇叭播放的功 ...

  5. Netty教程02:Netty实战之TCP服务

    源码地址:https://gitee.com/pidaner/netty-class 官网:https://netty.io/ Netty is an asynchronous event-drive ...

  6. Qt多线程 TCP 服务端

    Qt实现 多线程 TCP 服务端 因为项目中要用到TCP客户端的并发处理,所以TCP服务端用多线程去实现是必要的.于是花了一大早上的时间写了一各Demo 如图: 主要关键代码: 关于线程的处理.关于线 ...

  7. java BIO tcp服务端向客户端消息群发代码教程实战

    前言 项目需要和第三方厂商的服务需要用TCP协议通讯,考虑到彼此双方可能都会有断网重连.宕机重启的情况,需要保证 发生上述情况后,服务之间能够自动实现重新通信.研究测试之后整理如下代码实现.因为发现客 ...

  8. 为什么TCP服务端需要调用bind函数而客户端通常不需要呢

    分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! 那一年, ...

  9. Android——Tcp服务端实现

    Android--Tcp服务端         传输控制协议(TCP,Transmission Control Protocol)是一种面向连接的.可靠的.基于字节流的传输层通信协议.         ...

最新文章

  1. Windows 10 中 Eclipse中无法添加Courier New字体的解决方法!
  2. geoserver 3_SD 2-3/15 PR调速阀德国HAWE哈威
  3. Oracle UNDO表空间损坏时的处理办法
  4. MySQL日期、字符串、数值型转换
  5. 电力安全工作规程发电厂和变电站电气部分_一招告诉你,何为电力系统
  6. Perl Redis 连接测试脚本,获取list长度
  7. jQuery使用手册之三 CSS操作
  8. c#string类型探讨
  9. STM32——EMWIN位图显示(四)
  10. DIY面试题 for AI产品经理 | “智能音箱半夜诡异笑声”的原因分析及建议方案
  11. cf 467 B. Sleepy Game
  12. Kronecker(克罗内克积)积
  13. 电脑升级建议(加固态硬盘还是内存、CPU、显卡)
  14. 【OpenPCDet】稀疏卷积SPConv-v1.2代码解读(5)
  15. Ubuntu下安装RabbbitVCS(图形化svn管理工具)-- Ubuntu也有TortoiseSVN
  16. 李嘉诚能否再续神话?“长科版”上市内幕
  17. python学习笔记 os.scandir遍历目录
  18. CAD高版本转低版本怎么转?分享几种好用的转换方法
  19. Chinese Dragon
  20. typora修改为百度搜索引擎

热门文章

  1. 口罩机超大功率超声波换能器 日本进口 现货供应
  2. [ROS2]下载Navigation2源码来学习
  3. N2N安卓版随时随地用和手机访问你的电脑NAS 等设备
  4. java操作sentinel_sentinel (史上最全+入门教程)
  5. 干货 | 猎头业务多元化之 Mapping 应该怎么做?
  6. howland电流源公式推导
  7. 手机安全卫士开发系列(2)——splash界面
  8. HDU 2389(二分最大匹配优化算法,Hopcroft-Carp)
  9. 西安邮电大学计算机学院军训,测控技术与仪器的培养方案
  10. Oculus 0.7.0 SDK DX11渲染流程