前提

了解如何实现客户端和服务端通讯
上一篇博客——SpringBoot+Netty整合websocket(一)——客户端和服务端通讯

实现点对点聊天

后端

1.建立服务端WebSocketNettyServer

@Slf4j
@Configuration
public class WebSocketNettyServer {/** netty整合websocket的端口 */@Value("${netty.port}")private int port;public void run() throws InterruptedException {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss,worker).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.TCP_NODELAY,true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//web基于http协议的解码器ch.pipeline().addLast(new HttpServerCodec());//对大数据流的支持ch.pipeline().addLast(new ChunkedWriteHandler());//对http message进行聚合,聚合成FullHttpRequest或FullHttpResponsech.pipeline().addLast(new HttpObjectAggregator(1024 * 64));//websocket服务器处理对协议,用于指定给客户端连接访问的路径//该handler会帮你处理一些繁重的复杂的事//会帮你处理握手动作:handshaking(close,ping,pong) ping + pong = 心跳//对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));//添加我们的自定义channel处理器ch.pipeline().addLast(new WebSocketHandler());}});log.info("服务器启动中,websocket的端口为:"+port);ChannelFuture future = bootstrap.bind(port).sync();future.channel().closeFuture().sync();} finally {//关闭主从线程池worker.shutdownGracefully();boss.shutdownGracefully();}}
}
2.建立聊天类

聊天类主要是消息本身的各种属性

@Data
public class ChatVO implements Serializable {/** 消息id */private Integer questionId;/**聊天信息类型*/private String chatMessageType;/**聊天内容*/private String content;/**发送方ID*/private Integer fromUserId;/**接收方ID*/private Integer toUserId;/**消息时间*/@JSONField(format="yyyy-MM-dd HH:mm:ss")private Date dateTime;}

3.封装聊天消息的VO

继承聊天类,拥有聊天类的属性,额外封装消息的额外属性(比如:消息类型、是否读取等)

@EqualsAndHashCode(callSuper = true)
@Data
public class ChatMsgVO extends ChatVO {/** 动作类型 */private Integer action;/** 消息签收状态 */private MsgSignFlagEnum signed;}

4.建立枚举类MsgSignFlagEnum

主要用于判断消息是否签收

public enum MsgSignFlagEnum {/** 消息是否签收 */unsign(0,"未签收"),signed(1,"已签收");@Getterpublic final int type;@Getterpublic final String value;private MsgSignFlagEnum(int type,String value) {this.type = type;this.value = value;}}

5.建立枚举类MsgActionEnum

主要用于确定客户端发送消息的动作类型

public enum MsgActionEnum {/** 第一次(或重连)初始化连接 */CONNECT(1,"第一次(或重连)初始化连接"),/** 聊天消息 */CHAT(2,"聊天消息"),/** 客户端保持心跳 */KEEPALIVE(3,"客户端保持心跳");public final Integer type;public final String content;private MsgActionEnum(Integer type,String content) {this.type = type;this.content = content;}
}

6.在写WebSocketHandler之前,将用户Id跟Channel做一个绑定

主要用于确定客户端信息

@Slf4j
public class UserChannelRel {/** 用户id为键,channel为值 */private static ConcurrentHashMap<Integer, Channel> manager = new ConcurrentHashMap<>();/** 添加客户端与channel绑定 */public static void put(Integer senderId,Channel channel) {manager.put(senderId,channel);}/** 根据用户id查询 */public static Channel get(Integer senderId) {return manager.get(senderId);}/** 根据用户id,判断是否存在此客户端(即客户端是否在线) */public static boolean isContainsKey(Integer userId){return manager.containsKey(userId);}/** 输出 */public static void output() {manager.forEach(( key, value ) -> log.info("UserId:" + key + ",ChannelId:" +value.id().asLongText()));}}

到这里只要再建立WebSocketHandler,就可以实现点对点聊天

7.建立WebSocketHandler

@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {/*** 客户端组* 用于记录和管理所有客户端的channel*/public static ChannelGroup channelGroup;static {channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);}/*** 接收客户端传来的消息** @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0 ( ChannelHandlerContext ctx, Object msg ) throws Exception {Channel currentChannel = ctx.channel();//文本消息if (msg instanceof TextWebSocketFrame) {String message = ((TextWebSocketFrame) msg).text();System.out.println("收到客户端消息:" + message);//json消息转换为Javabean对象ChatMsgVO chatMsgVO = null;try {chatMsgVO = JSONUtil.toBean(message, ChatMsgVO.class, true);} catch (JSONException e) {e.printStackTrace();System.out.println("json解析异常,发送的消息应该为json格式");return;}//得到消息的动作类型Integer action = chatMsgVO.getAction();//客户端第一次连接websocket或者重连时执行if (action.equals(MsgActionEnum.CONNECT.type)) {//当websocket第一次open的时候,初始化channel,把用的channel和userId关联起来Integer fromUserId = chatMsgVO.getFromUserId();UserChannelRel.put(fromUserId, currentChannel);//测试channelGroup.forEach(channel -> log.info(channel.id().asLongText()));UserChannelRel.output();} else if (action.equals(MsgActionEnum.CHAT.type)) {//聊天类型的消息,把聊天记录保存到redis,同时标记消息的签收状态[是否签收]Integer toUserId = chatMsgVO.getToUserId();//设置发送消息的时间chatVO.setDateTime(new DateTime());/* 发送消息给指定用户 *///判断消息是否符合定义的类型if (ChatTypeVerificationUtil.verifyChatType(chatVO.getChatMessageType())) {//发送消息给指定用户if (toUserId > 0 && UserChannelRel.isContainsKey(toUserId)) {sendMessage(toUserId, JSONUtil.toJsonStr(chatVO));}} else {//消息不符合定义的类型的处理}} else if (action.equals(MsgActionEnum.KEEPALIVE.type)) {//心跳类型的消息log.info("收到来自channel为[" + currentChannel + "]的心跳包");}}//二进制消息if (msg instanceof BinaryWebSocketFrame) {System.out.println("收到二进制消息:" + ((BinaryWebSocketFrame) msg).content().readableBytes());BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(Unpooled.buffer().writeBytes("hello".getBytes()));//给客户端发送的消息ctx.channel().writeAndFlush(binaryWebSocketFrame);}//ping消息if (msg instanceof PongWebSocketFrame) {System.out.println("客户端ping成功");}//关闭消息if (msg instanceof CloseWebSocketFrame) {System.out.println("客户端关闭,通道关闭");Channel channel = ctx.channel();channel.close();}}/*** Handler活跃状态,表示连接成功* 当客户端连接服务端之后(打开连接)* 获取客户端的channel,并且放到ChannelGroup中去进行管理** @param ctx* @throws Exception*/@Overridepublic void handlerAdded ( ChannelHandlerContext ctx ) throws Exception {System.out.println("与客户端连接成功");channelGroup.add(ctx.channel());}/*** @param ctx* @throws Exception*/@Overridepublic void handlerRemoved ( ChannelHandlerContext ctx ) throws Exception {//当触发handlerRemoved,ChannelGroup会自动移除对应的客户端的channel//所以下面这条语句可不写
//        clients.remove(ctx.channel());log.info("客户端断开,channel对应的长id为:" + ctx.channel().id().asLongText());log.info("客户端断开,channel对应的短id为:" + ctx.channel().id().asShortText());}/*** 异常处理** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught ( ChannelHandlerContext ctx, Throwable cause ) throws Exception {System.out.println("连接异常:" + cause.getMessage());cause.printStackTrace();ctx.channel().close();channelGroup.remove(ctx.channel());}@Overridepublic void userEventTriggered ( ChannelHandlerContext ctx, Object evt ) throws Exception {//IdleStateEvent是一个用户事件,包含读空闲/写空闲/读写空闲if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {log.info("进入读空闲");} else if (event.state() == IdleState.WRITER_IDLE) {log.info("进入写空闲");} else if (event.state() == IdleState.ALL_IDLE) {log.info("channel关闭前,用户数量为:" + channelGroup.size());//关闭无用的channel,以防资源浪费ctx.channel().close();log.info("channel关闭后,用户数量为:" + channelGroup.size());}}}/*** 给指定用户发内容* 后续可以掉这个方法推送消息给客户端*/public void sendMessage ( Integer toUserId, String message ) {Channel channel = UserChannelRel.get(toUserId);channel.writeAndFlush(new TextWebSocketFrame(message));}/*** 群发消息*/public void sendMessageAll ( String message ) {channelGroup.writeAndFlush(new TextWebSocketFrame(message));}}
1.JSON处理

接受客户端的消息都是json数据类型的,这里采用的json处理使用的是Hutool工具包(完善并且轻量级的Java工具包)
如何使用?
直接引入依赖即可

<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.3.5</version>
</dependency>

详细使用请参考官网:https://hutool.cn/docs/

2.TextWebSocketFrame: 在netty中,用于为websocket专门处理文本的对象,frame是消息的载体
3.SimpleChannelInboundHandler<Object>中的Object意味可以接收任意类型的消息。
4.ChatTypeVerificationUtil主要用于验证消息类型(比如文本、图片、语音)等
public class ChatTypeVerificationUtil {/*** 功能描述:枚举:聊天信息的类型* @author RenShiWei* Date: 2020/2/6 15:58*/@Getter@NoArgsConstructor@AllArgsConstructorpublic enum ChatMessageTypeEnum {/**文本*/TEXT("text"),/**图片*/IMAGE("image"),/**音频*/VOICE("voice"),/**心跳包*/HEART("heart"),;private String chatType;}/*** 功能描述:* @param chatType 预判断类型* @return boolean*/public static boolean verifyChatType(String chatType) {//循环枚举for (ChatMessageTypeEnum airlineTypeEnum : ChatMessageTypeEnum.values()) {if (StringUtils.isNotBlank(chatType)&&chatType.equals(airlineTypeEnum.getChatType())){return true;}}return false;}}

8.在SpringBoot启动时,启动Netty整合的websocket服务

启动类实现CommandLineRunner 接口,重写run方法,用来在项目启动时预加载资源

/*** 声明CommandLineRunner接口,实现run方法,就能给启动项目同时启动netty服务*/
@SpringBootApplication
public class WebsocketApplication implements CommandLineRunner {/** 注入netty整合websocket的服务  CommandLineRunner */@Autowiredprivate WebSocketNettyServer webSocketNettyServer;public static void main(String[] args) throws InterruptedException {SpringApplication.run(WebsocketApplication.class, args);}/***声明CommandLineRunner接口,实现run方法,就能给启动项目同时启动netty服务*/@Overridepublic void run ( String... args ) throws Exception {webSocketNettyServer.run();}
}

在application.yml配置netty的启动端口

netty:port: 10101

连接netty整合的websocket路径:ws://127.0.0.1:10101/ws

可通过在线websocket进行测试:http://www.easyswoole.com/wstool.html

前端使用

连接地址

前端连接websocket地址:ws://127.0.0.1:10101/ws
10101为yml文件自定义的端口(可以自定义,但不能与项目端口重复)

第一次连接或者重连websocket

第一次连接或者重连websocket必须发送指定的json消息类型
例如:

{"fromUserId": "1","action":"1"
}

fromUserId为连接websocket的用户id
action为后端定义的消息动作(1代表的是首次连接或者重连)。

客户端发送的消息类型

{"questionId": "113",
"chatMessageType": "text",
"content": "01用户发送消息",
"fromUserId": "1",
"toUserId": "2",
"action":"2"
}

questionIdchatMessageType为业务需求,暂时可以忽略
content为发送消息的内容
fromUserId为发送方的用户id
toUserId为接受方的用户id
action为后端定义的消息动作(2代表的是聊天消息)。

心跳包

很多时候,服务器需要在一定的时间段内知道客户端是否还在线,所以可以采用客户端定期给服务器发送心跳数据的方式。

{"fromUserId": "1","action":"3"
}

fromUserId为发送方的用户id
action为后端定义的消息动作(3代表的是心跳包消息)。

注意:action可以定义成常量,与后端对应,防止出错,也方便维护。

前端实现

具体的前端实现,略。可参考上一篇文章,一般需要根据具体的业务逻辑来写。

总结

  1. 现在可以实现点对点聊天,即客户端与客户端通信,但是只是实现了最基础的聊天功能,并不是很完善。
  2. 一般都需要将聊天的消息存储在数据库当中,保存聊天记录。但是聊天的业务一般比较频繁,如果每条消息都存储在数据库,会给数据库造成很大的压力。所以一般采用的方式都是采用redis缓存消息,等到积累到一定的程度,然后在将消息统一存储进数据库。
  3. 现在客户端发送消息只能是对方客户端也连接websocket(即在线状态下)才可以实现通讯,如何对方离线,则不行,缺少对离线消息的处理。

下一篇博客将总结如何将客户端通信的消息缓存进redis,并达到一定的条件下存储进mysql
SpringBoot+Netty整合websocket(三)——客户端聊天消息存储到redis和MySQL,并实现离线消息的处理

websocket性能低?教你使用netty整合websocket(二)——实现点对点聊天(客户端与客户端通信)相关推荐

  1. netty整合websocket支持自签证书出现netty websocket ssl Received fatal alert: certificate_unknown

    自签证书 win+r cmd 生成自己jks文件,指向自己要生成jks的文件位置下,我直接生成到项目resources下 #换成自己的本地ip keytool -genkey -alias serve ...

  2. Springboot整合WebSocket(基于Stomp)

    Springboot整合WebSocket(基于Stomp) 文章目录 Springboot整合WebSocket(基于Stomp) 参考链接 前言 STOMP 定义 STOMP Over WebSo ...

  3. 【SpringBoot框架篇】18.使用Netty加websocket实现在线聊天功能

    文章目录 1.简介 2.最终功能实现的效果图 2.1.pc端 2.2.移动端 3.实战应用 3.1.引入依赖 3.2.配置文件 3.3.测试demo 3.3.1.消息内容实体类 3.3.2.处理请求的 ...

  4. 基于netty搭建websocket,实现消息的主动推送

    基于netty搭建websocket,实现消息的主动推送 rpf_siwash https://www.jianshu.com/p/56216d1052d7 netty是由jboss提供的一款开源框架 ...

  5. 最简单的springboot整合websocket方式

    简介 WebSocket是一种与HTTP不同的协议.两者都位于OSI模型的应用层,并且都依赖于传输层的TCP协议. 虽然它们不同,但是RFC 6455中规定:it is designed to wor ...

  6. js websocket同步等待_WebSocket硬核入门:200行代码,教你徒手撸一个WebSocket服务器...

    本文原题"Node.js - 200 多行代码实现 Websocket 协议",为了提升内容品质,有较大修订. 1.引言 最近正在研究 WebSocket 相关的知识,想着如何能自 ...

  7. SpringBoot+Vue整合WebSocket实现前后端消息推送

    场景 WebSocket HTTP 协议是一种无状态的.无连接的.单向的应用层协议.它采用了请求/响应模型.通信请求只能由客户端发起,服务端对请求做出应答处理. 这种通信模型有一个弊端:HTTP 协议 ...

  8. netty系列之:使用netty搭建websocket客户端

    文章目录 简介 浏览器客户端 netty对websocket客户端的支持 WebSocketClientHandshaker WebSocketClientCompressionHandler net ...

  9. netty系列之:使用netty搭建websocket服务器

    文章目录 简介 netty中的websocket websocket的版本 FrameDecoder和FrameEncoder WebSocketServerHandshaker WebSocketF ...

最新文章

  1. Spring Boot 2.0(五):Docker Compose + Spring Boot + Nginx + Mysql 实践
  2. 透过国外初创公司看高精度地图难题
  3. 蚂蚁金服安全应急响应中心上线 用户可提交漏洞
  4. python编程下载安卓版-python编程入门
  5. python教程:循环(while和for)
  6. python 跳出for循环_python中如何退出for循环
  7. 2017-11-17 为Python添加中文关键字 1
  8. Echart极坐标间隔显示-单类目轴
  9. ElasticSearch6.8.1集群搭建及Java客户端编写
  10. 查找算法-------插值查找
  11. SM2椭圆曲线公钥加密/解密算法
  12. iOS popViewController 失败
  13. 致Oracle开发者的学习资源清单
  14. [資源]RAID是什么意思?RAID的应用
  15. 爱普生AR眼镜应用场景图鉴:八个领域案例都在这里
  16. 大数据发展前景及就业方向【大数据专业讲座】
  17. Noise-contrastive Estimation(NCE)学习
  18. echarts画出特殊形状的柱状图
  19. 简单聊聊FPGA的一些参数——后篇
  20. Linux基础命令---find

热门文章

  1. vue3+ts+vue-qr 使用canvas生成彩色实点二维码
  2. Dell XPS15 9570 拆机升级内存
  3. java对PNG图片圆角处理 保持PNG透明背景
  4. Eclipse安装Aptana插件【在线安装】
  5. win10 计算机性能评分,Win10系统PC游戏掌机GPD Win10评测 究竟性能如何
  6. [前端学习笔记1]蓝桥杯Web应用开发组考点内容
  7. 一个名叫 c语言 的手机题库软件,C语言二级考试题库
  8. ssd 以太网设备驱动
  9. Flask 自定义模型类
  10. Java回顾——类和对象