Java UDP Server的轻量级实现

实现方法

接收线程:只处理收包,收完后之后放入工作线程

发送线程:负责发送udp包到其它的server

工作线程:解析包体,实现业务逻辑

工作线程消息处理:在工作线程中解析出协议包体后,根据messageId实现消息处理

主要的java类

ServerManager.java

SenderThread.java

ReceiverThread.java

WorkThread.java

ServerManager.java 相关代码

public static void startListener() throws UDPException{
        try {
            if(tdUdpServerIsStart){
                throw new UDPException("服务已启动");
            }
            hasStartSet = false;
            tdUdpServerIsStart = true;

new Thread(new WorkThread()).start();
            new Thread(new ReceiverThread()).start();
            new Thread(new SenderThread()).start();
           
            datagramSocket = new DatagramSocket(PORT);
            datagramSocket.setSoTimeout(TIME_OUT);
            datagramSocket.setReceiveBufferSize(UDP_RECEIVER_BUFF_SIZE);

} catch (SocketException e) {
            log.error("",e);
            throw new UDPException("UDPServer 启动异常,端口:"+PORT,e);
        }
    }

SenderThread.java 相关代码:

逻辑:读取队列中的消息。并实现发送

@Overridepublic void run() {log.info("UDP Sender thread started");while(ServerManager.tdUdpServerIsStart) try {UDPMessage reqBody = senderQueue.take();//队列中没有时会一直等待byte[] bodyBit = reqBody.getMsgBits();if(reqBody.getHeader() == null){log.warn("Header is null");continue;}if(reqBody.getHeader().getMessageId() == null){log.warn("MessageId is null!");continue;}//            log.debug("Send head:"+reqBody.getHeader().toString());
//            log.info(String.format("Preprocess messageId:%s,len:%s,sessionId:%s to %s", reqBody.getHeader().getMessageId(),bodyBit.length, reqBody.getHeader().getSessionId(), reqBody.getHeader().getSocketAddress()));
//            byte[] body = ProtMsgData.makeMsgStream(reqBody.getSessionId(), reqBody.getMessageId().getMessageId(), bodyBit);byte[] body = reqBody.getMsgBits();DatagramPacket dataPacket = new DatagramPacket(body, body.length,reqBody.getHeader().getSocketAddress());//测试CODE
//            DatagramPacket dataPacket = new DatagramPacket(body, body.length,
//                    InetAddress.getByName("10.10.10.100"), 6060);//            DatagramSocket dataSocket = new DatagramSocket(reqBody.getHeader().getInetSocketAddress().getPort());
//            DatagramSocket dataSocket = new DatagramSocket(45677);DatagramSocket dataSocket = getSocket(reqBody.getHeader().getInetSocketAddress().getPort());if(dataSocket == null){log.warn(String.format("Send error!Not found socket by port:%s",reqBody.getHeader().getInetSocketAddress().getPort()));continue;}log.info(String.format("Send message to %s,id:%s,len:%s,data:%s",reqBody.getHeader().getSocketAddress().toString(),reqBody.getHeader().getMessageId(),body.length,ProtUtils.getHexByString(body)));dataSocket.send(dataPacket);//如果需要接收消息,先将记录保存起来//            if (reqBody.isNeedReceive()) {
//                UDPSessionManager.add(reqBody);
//            }} catch (Exception e) {log.error("Send message error", e);}}

ReceiverThread.java 相关代码

封装数据,给工作线程使用。也可以在这里解析协议的头,方便后续使用

@Overridepublic void run() {Init();}public void Init() {while(UCSIServerManager.tdUdpServerIsStart){try {byte[] receiveBody = new byte[BUFFER_SIZE];DatagramPacket dataPacketBody = new DatagramPacket(receiveBody, receiveBody.length);if(dataPacketBody == null) continue;ServerManager.receive(dataPacketBody);if(dataPacketBody == null) continue;//添加到工作线程队列ProtRespInfo respInfo = new ProtRespInfo(null,dataPacketBody); UDPWorkThread.push(respInfo);} catch (Exception e) {}}
}

WorkThread.java 相关代码

@Overridepublic void run() {while(UCSIServerManager.tdUdpServerIsStart){try {ProtRespInfo packet = workQueue.take();UDPReceiverHandler handler = new UDPReceiverHandler(packet);handler.receive();} catch (InterruptedException e) {e.printStackTrace();continue;}}
}public static void push(ProtRespInfo respInfo){try {workQueue.put(respInfo);} catch (Exception e) {e.printStackTrace();}
}

UDPReceiverHandler.java

实现在解析协议头和协议体。并根据messageId来调用对应的消息实现,我这里使用的注解的方式,程序启时扫描相关注解,如果新加协议只需要实现对应的接口,并加上注解。

public void receive(){        log.debug("UDPReceiver receive..."); UDPHeader header = getUDPHeader(respInfo.getDeviceType());if(header == null){log.warn("Not found head by port:"+respInfo.getDataPacket().getPort());return;}header.setSocketAddress(respInfo.getDataPacket().getSocketAddress());header.data = respInfo.getDataPacket().getData();header.len = header.getHeaderSize();header.decode();header.setDeviceType(respInfo.getDeviceType());respInfo.setHeader(header);if(respInfo.getHeader() == null || respInfo.getHeader().getMessageId() == null){log.warn("消息头错误,来自IP:"+respInfo.getHeader().getRemoteAddr()+",端口:"+respInfo.getHeader().getRemotePort());return;}try {IReceiverHandler handler = ReceiverHandlerManager.get(respInfo.getHeader().getMessageId().getMessageId());if(handler == null){log.error("没有找到对应的消息Handler,ID:"+respInfo.getHeader().getMessageId());return;}handler.handler(header,respInfo);} catch (Exception e) {log.error("",e);}}

小结

丢包严重:可以尝试增加DatagramSocket.setReceiveBufferSize来增加缓冲区大小

数据解析:定义好协议后,将协议头和协议体抽象出来。我这里抽象了一个UDPCoder(一些数据的公用read方法和write方法),UDPHeader,UDPBody。

PS:有空之后一定把源码整理出来

Java UDP Server的轻量级实现相关推荐

  1. 为什么 jmeter 分布式测试,一定要设置 java.rmi.server.hostname

    之前总结了 jmeter 分布式测试的过程,在部署过程中提到,要在 system.properties 中配置自己的 IP. 至于为什么要这么做,源于这一次 debug 的过程. 运行环境 mint, ...

  2. python udp 大文件_Python UDP服务器发送文本文件的行(Python UDP Server send lines of a text file)...

    Python UDP服务器发送文本文件的行(Python UDP Server send lines of a text file) 我需要模拟一个UDP服务器,它在无限循环中逐行发送文本文件的内容. ...

  3. JAVA UDP套接字编程

    JAVA UDP套接字编程 UDP套接字 无连接 非可靠传输 面向数据报 package com.lius.udp;import java.io.IOException; import java.ne ...

  4. issue no route to host 为什么 jmeter 分布式测试,一定要设置 java.rmi.server.hostname--(有效)

    之前总结了 jmeter 分布式测试的过程,在部署过程中提到,要在 system.properties 中配置自己的 IP. 至于为什么要这么做,源于这一次 debug 的过程. 运行环境 技术分享图 ...

  5. java UDP实现一个聊天工具

    题目: 假设Tom和Jerry利用Java UDP进行聊天,请为他们编写程序.具体如下: (1).Tom和Jerry聊天的双方都应该具有发送端和接收端: (2).利用DatagramSocket与Da ...

  6. Java UDP 编程简介.

    一.UDP 协议简介 UPD协议 是常见的 网络传输协议之一, 当然另1个是TCP协议. UPD协议 是一种不靠的协议. 是因为发送方不会关心接受方的状态, 直接向接收方发送数据包, 也就是说这个数据 ...

  7. Tomcat 之 启动tomcat时 错误: 代理抛出异常 : java.rmi.server.ExportException: Port already in use: 1099;...

    错误: 代理抛出异常 : java.rmi.server.ExportException: Port already in use: 1099; nested exception is:  java. ...

  8. Jodd - Java界的瑞士军刀轻量级工具包

    转载自 Jodd - Java界的瑞士军刀轻量级工具包! Jodd介绍 Jodd是对于Java开发更便捷的开源迷你框架,包含工具类.实用功能的集合,总包体积不到1.7M. Jodd构建于通用场景使开发 ...

  9. java.rmi.server.port_java.rmi.server.ExportException: internal error: ObjID already in use报错处理...

    由于在server.xml文件中使用配置了 在catalina.sh中也指定了对应 CATALINA_OPTS="$CATALINA_OPTS -Dcom.sun.management.jm ...

最新文章

  1. C#进行Visio二次开发之电气线路停电分析逻辑
  2. 一起谈.NET技术,.Net Discovery系列之-深入理解平台机制与性能影响 (中)
  3. 强化学习在机器人中的应用
  4. c/c++ 函数、常量、指针和数组的关系梳理
  5. 阿里巴巴计算机视觉领域最强阵容集结完毕团体参加CVPR会议
  6. 排序算法之希尔排序(Java)
  7. OpenSea联合创始人Devin Finzer:NFT空间在未来几年会产生更多应用 | FBEC 2020
  8. C#获取程序所在的目录
  9. 小爱音箱mini系统故障怎么办_Win7蓝牙连接小爱音箱mini的详细教程
  10. 航空插头的环境性能科普
  11. webview防止跳转到浏览器
  12. 解决:RuntimeError: mat1 and mat2 shapes cannot be multiplied (8x256 and 8x256)维度不匹配问题
  13. 第一代GCN: Spectral Networks and Locally Connected Networks on Graphs
  14. 日本首次利用IPS细胞分化成免疫细胞应用于癌症治疗
  15. 【区块链讲师会前访谈】结合应用场景 区块链共识算法如何选择?
  16. 解决运行web项目问题:localhost: 8080 is already in use
  17. DOSBox简单指令
  18. 一站式在线医疗解决方案,即构音视频技术助建互联网医疗
  19. 日本大阪强制公务员下班,每天18点半电脑自动关机…这能行?!
  20. 一道烧脑的双重否定逻辑题

热门文章

  1. java5.0下载_java虚拟机下载 v5.0 官方免费版
  2. <贪心算法>学习及经典实例分析
  3. el-select下拉框不同证件类型校验思路
  4. HC小区管理系统房屋收费功能说明
  5. 二进制空间权重矩阵_空间权重矩阵(SWM)
  6. mybatis动态sql及分页
  7. 天行健!君子以自强不息!
  8. 基于Java的课程管理系统
  9. 第三届互联网CIO-CTO班招募,CSDN 5个推荐名额,火热报名中
  10. CAD2021精简版安装教程附下载地址