websocket性能低?教你使用netty整合websocket(二)——实现点对点聊天(客户端与客户端通信)
前提
了解如何实现客户端和服务端通讯
上一篇博客——SpringBoot+Netty整合websocket(一)——客户端和服务端通讯
实现点对点聊天
后端
1.建立服务端WebSocketNettyServer
@Slf4j
@Configuration
public class WebSocketNettyServer {/** netty整合websocket的端口 */@Value("${netty.port}")private int port;public void run() throws InterruptedException {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss,worker).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.TCP_NODELAY,true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//web基于http协议的解码器ch.pipeline().addLast(new HttpServerCodec());//对大数据流的支持ch.pipeline().addLast(new ChunkedWriteHandler());//对http message进行聚合,聚合成FullHttpRequest或FullHttpResponsech.pipeline().addLast(new HttpObjectAggregator(1024 * 64));//websocket服务器处理对协议,用于指定给客户端连接访问的路径//该handler会帮你处理一些繁重的复杂的事//会帮你处理握手动作:handshaking(close,ping,pong) ping + pong = 心跳//对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));//添加我们的自定义channel处理器ch.pipeline().addLast(new WebSocketHandler());}});log.info("服务器启动中,websocket的端口为:"+port);ChannelFuture future = bootstrap.bind(port).sync();future.channel().closeFuture().sync();} finally {//关闭主从线程池worker.shutdownGracefully();boss.shutdownGracefully();}}
}
2.建立聊天类
聊天类主要是消息本身的各种属性
@Data
public class ChatVO implements Serializable {/** 消息id */private Integer questionId;/**聊天信息类型*/private String chatMessageType;/**聊天内容*/private String content;/**发送方ID*/private Integer fromUserId;/**接收方ID*/private Integer toUserId;/**消息时间*/@JSONField(format="yyyy-MM-dd HH:mm:ss")private Date dateTime;}
3.封装聊天消息的VO
继承聊天类,拥有聊天类的属性,额外封装消息的额外属性(比如:消息类型、是否读取等)
@EqualsAndHashCode(callSuper = true)
@Data
public class ChatMsgVO extends ChatVO {/** 动作类型 */private Integer action;/** 消息签收状态 */private MsgSignFlagEnum signed;}
4.建立枚举类MsgSignFlagEnum
主要用于判断消息是否签收
public enum MsgSignFlagEnum {/** 消息是否签收 */unsign(0,"未签收"),signed(1,"已签收");@Getterpublic final int type;@Getterpublic final String value;private MsgSignFlagEnum(int type,String value) {this.type = type;this.value = value;}}
5.建立枚举类MsgActionEnum
主要用于确定客户端发送消息的动作类型
public enum MsgActionEnum {/** 第一次(或重连)初始化连接 */CONNECT(1,"第一次(或重连)初始化连接"),/** 聊天消息 */CHAT(2,"聊天消息"),/** 客户端保持心跳 */KEEPALIVE(3,"客户端保持心跳");public final Integer type;public final String content;private MsgActionEnum(Integer type,String content) {this.type = type;this.content = content;}
}
6.在写WebSocketHandler之前,将用户Id跟Channel做一个绑定
主要用于确定客户端信息
@Slf4j
public class UserChannelRel {/** 用户id为键,channel为值 */private static ConcurrentHashMap<Integer, Channel> manager = new ConcurrentHashMap<>();/** 添加客户端与channel绑定 */public static void put(Integer senderId,Channel channel) {manager.put(senderId,channel);}/** 根据用户id查询 */public static Channel get(Integer senderId) {return manager.get(senderId);}/** 根据用户id,判断是否存在此客户端(即客户端是否在线) */public static boolean isContainsKey(Integer userId){return manager.containsKey(userId);}/** 输出 */public static void output() {manager.forEach(( key, value ) -> log.info("UserId:" + key + ",ChannelId:" +value.id().asLongText()));}}
到这里只要再建立WebSocketHandler,就可以实现点对点聊天
7.建立WebSocketHandler
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {/*** 客户端组* 用于记录和管理所有客户端的channel*/public static ChannelGroup channelGroup;static {channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);}/*** 接收客户端传来的消息** @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0 ( ChannelHandlerContext ctx, Object msg ) throws Exception {Channel currentChannel = ctx.channel();//文本消息if (msg instanceof TextWebSocketFrame) {String message = ((TextWebSocketFrame) msg).text();System.out.println("收到客户端消息:" + message);//json消息转换为Javabean对象ChatMsgVO chatMsgVO = null;try {chatMsgVO = JSONUtil.toBean(message, ChatMsgVO.class, true);} catch (JSONException e) {e.printStackTrace();System.out.println("json解析异常,发送的消息应该为json格式");return;}//得到消息的动作类型Integer action = chatMsgVO.getAction();//客户端第一次连接websocket或者重连时执行if (action.equals(MsgActionEnum.CONNECT.type)) {//当websocket第一次open的时候,初始化channel,把用的channel和userId关联起来Integer fromUserId = chatMsgVO.getFromUserId();UserChannelRel.put(fromUserId, currentChannel);//测试channelGroup.forEach(channel -> log.info(channel.id().asLongText()));UserChannelRel.output();} else if (action.equals(MsgActionEnum.CHAT.type)) {//聊天类型的消息,把聊天记录保存到redis,同时标记消息的签收状态[是否签收]Integer toUserId = chatMsgVO.getToUserId();//设置发送消息的时间chatVO.setDateTime(new DateTime());/* 发送消息给指定用户 *///判断消息是否符合定义的类型if (ChatTypeVerificationUtil.verifyChatType(chatVO.getChatMessageType())) {//发送消息给指定用户if (toUserId > 0 && UserChannelRel.isContainsKey(toUserId)) {sendMessage(toUserId, JSONUtil.toJsonStr(chatVO));}} else {//消息不符合定义的类型的处理}} else if (action.equals(MsgActionEnum.KEEPALIVE.type)) {//心跳类型的消息log.info("收到来自channel为[" + currentChannel + "]的心跳包");}}//二进制消息if (msg instanceof BinaryWebSocketFrame) {System.out.println("收到二进制消息:" + ((BinaryWebSocketFrame) msg).content().readableBytes());BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(Unpooled.buffer().writeBytes("hello".getBytes()));//给客户端发送的消息ctx.channel().writeAndFlush(binaryWebSocketFrame);}//ping消息if (msg instanceof PongWebSocketFrame) {System.out.println("客户端ping成功");}//关闭消息if (msg instanceof CloseWebSocketFrame) {System.out.println("客户端关闭,通道关闭");Channel channel = ctx.channel();channel.close();}}/*** Handler活跃状态,表示连接成功* 当客户端连接服务端之后(打开连接)* 获取客户端的channel,并且放到ChannelGroup中去进行管理** @param ctx* @throws Exception*/@Overridepublic void handlerAdded ( ChannelHandlerContext ctx ) throws Exception {System.out.println("与客户端连接成功");channelGroup.add(ctx.channel());}/*** @param ctx* @throws Exception*/@Overridepublic void handlerRemoved ( ChannelHandlerContext ctx ) throws Exception {//当触发handlerRemoved,ChannelGroup会自动移除对应的客户端的channel//所以下面这条语句可不写
// clients.remove(ctx.channel());log.info("客户端断开,channel对应的长id为:" + ctx.channel().id().asLongText());log.info("客户端断开,channel对应的短id为:" + ctx.channel().id().asShortText());}/*** 异常处理** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught ( ChannelHandlerContext ctx, Throwable cause ) throws Exception {System.out.println("连接异常:" + cause.getMessage());cause.printStackTrace();ctx.channel().close();channelGroup.remove(ctx.channel());}@Overridepublic void userEventTriggered ( ChannelHandlerContext ctx, Object evt ) throws Exception {//IdleStateEvent是一个用户事件,包含读空闲/写空闲/读写空闲if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {log.info("进入读空闲");} else if (event.state() == IdleState.WRITER_IDLE) {log.info("进入写空闲");} else if (event.state() == IdleState.ALL_IDLE) {log.info("channel关闭前,用户数量为:" + channelGroup.size());//关闭无用的channel,以防资源浪费ctx.channel().close();log.info("channel关闭后,用户数量为:" + channelGroup.size());}}}/*** 给指定用户发内容* 后续可以掉这个方法推送消息给客户端*/public void sendMessage ( Integer toUserId, String message ) {Channel channel = UserChannelRel.get(toUserId);channel.writeAndFlush(new TextWebSocketFrame(message));}/*** 群发消息*/public void sendMessageAll ( String message ) {channelGroup.writeAndFlush(new TextWebSocketFrame(message));}}
附
1.JSON处理
接受客户端的消息都是json数据类型的,这里采用的json处理使用的是Hutool工具包(完善并且轻量级的Java工具包)
如何使用?
直接引入依赖即可
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.3.5</version>
</dependency>
详细使用请参考官网:https://hutool.cn/docs/
2.TextWebSocketFrame: 在netty中,用于为websocket专门处理文本的对象,frame是消息的载体
3.SimpleChannelInboundHandler<Object>
中的Object
意味可以接收任意类型的消息。
4.ChatTypeVerificationUtil
主要用于验证消息类型(比如文本、图片、语音)等
public class ChatTypeVerificationUtil {/*** 功能描述:枚举:聊天信息的类型* @author RenShiWei* Date: 2020/2/6 15:58*/@Getter@NoArgsConstructor@AllArgsConstructorpublic enum ChatMessageTypeEnum {/**文本*/TEXT("text"),/**图片*/IMAGE("image"),/**音频*/VOICE("voice"),/**心跳包*/HEART("heart"),;private String chatType;}/*** 功能描述:* @param chatType 预判断类型* @return boolean*/public static boolean verifyChatType(String chatType) {//循环枚举for (ChatMessageTypeEnum airlineTypeEnum : ChatMessageTypeEnum.values()) {if (StringUtils.isNotBlank(chatType)&&chatType.equals(airlineTypeEnum.getChatType())){return true;}}return false;}}
8.在SpringBoot启动时,启动Netty整合的websocket服务
启动类实现CommandLineRunner 接口,重写run方法,用来在项目启动时预加载资源
/*** 声明CommandLineRunner接口,实现run方法,就能给启动项目同时启动netty服务*/
@SpringBootApplication
public class WebsocketApplication implements CommandLineRunner {/** 注入netty整合websocket的服务 CommandLineRunner */@Autowiredprivate WebSocketNettyServer webSocketNettyServer;public static void main(String[] args) throws InterruptedException {SpringApplication.run(WebsocketApplication.class, args);}/***声明CommandLineRunner接口,实现run方法,就能给启动项目同时启动netty服务*/@Overridepublic void run ( String... args ) throws Exception {webSocketNettyServer.run();}
}
在application.yml配置netty的启动端口
netty:port: 10101
连接netty整合的websocket路径:ws://127.0.0.1:10101/ws
可通过在线websocket进行测试:http://www.easyswoole.com/wstool.html
前端使用
连接地址
前端连接websocket地址:ws://127.0.0.1:10101/ws
10101为yml文件自定义的端口(可以自定义,但不能与项目端口重复)
第一次连接或者重连websocket
第一次连接或者重连websocket必须发送指定的json消息类型
例如:
{"fromUserId": "1","action":"1"
}
fromUserId
为连接websocket的用户id
action
为后端定义的消息动作(1代表的是首次连接或者重连)。
客户端发送的消息类型
{"questionId": "113",
"chatMessageType": "text",
"content": "01用户发送消息",
"fromUserId": "1",
"toUserId": "2",
"action":"2"
}
questionId
和chatMessageType
为业务需求,暂时可以忽略
content
为发送消息的内容
fromUserId
为发送方的用户id
toUserId
为接受方的用户id
action
为后端定义的消息动作(2代表的是聊天消息)。
心跳包
很多时候,服务器需要在一定的时间段内知道客户端是否还在线,所以可以采用客户端定期给服务器发送心跳数据的方式。
{"fromUserId": "1","action":"3"
}
fromUserId
为发送方的用户id
action
为后端定义的消息动作(3代表的是心跳包消息)。
注意:action
可以定义成常量,与后端对应,防止出错,也方便维护。
前端实现
具体的前端实现,略。可参考上一篇文章,一般需要根据具体的业务逻辑来写。
总结
- 现在可以实现点对点聊天,即客户端与客户端通信,但是只是实现了最基础的聊天功能,并不是很完善。
- 一般都需要将聊天的消息存储在数据库当中,保存聊天记录。但是聊天的业务一般比较频繁,如果每条消息都存储在数据库,会给数据库造成很大的压力。所以一般采用的方式都是采用redis缓存消息,等到积累到一定的程度,然后在将消息统一存储进数据库。
- 现在客户端发送消息只能是对方客户端也连接websocket(即在线状态下)才可以实现通讯,如何对方离线,则不行,缺少对离线消息的处理。
下一篇博客将总结如何将客户端通信的消息缓存进redis,并达到一定的条件下存储进mysql
SpringBoot+Netty整合websocket(三)——客户端聊天消息存储到redis和MySQL,并实现离线消息的处理
websocket性能低?教你使用netty整合websocket(二)——实现点对点聊天(客户端与客户端通信)相关推荐
- netty整合websocket支持自签证书出现netty websocket ssl Received fatal alert: certificate_unknown
自签证书 win+r cmd 生成自己jks文件,指向自己要生成jks的文件位置下,我直接生成到项目resources下 #换成自己的本地ip keytool -genkey -alias serve ...
- Springboot整合WebSocket(基于Stomp)
Springboot整合WebSocket(基于Stomp) 文章目录 Springboot整合WebSocket(基于Stomp) 参考链接 前言 STOMP 定义 STOMP Over WebSo ...
- 【SpringBoot框架篇】18.使用Netty加websocket实现在线聊天功能
文章目录 1.简介 2.最终功能实现的效果图 2.1.pc端 2.2.移动端 3.实战应用 3.1.引入依赖 3.2.配置文件 3.3.测试demo 3.3.1.消息内容实体类 3.3.2.处理请求的 ...
- 基于netty搭建websocket,实现消息的主动推送
基于netty搭建websocket,实现消息的主动推送 rpf_siwash https://www.jianshu.com/p/56216d1052d7 netty是由jboss提供的一款开源框架 ...
- 最简单的springboot整合websocket方式
简介 WebSocket是一种与HTTP不同的协议.两者都位于OSI模型的应用层,并且都依赖于传输层的TCP协议. 虽然它们不同,但是RFC 6455中规定:it is designed to wor ...
- js websocket同步等待_WebSocket硬核入门:200行代码,教你徒手撸一个WebSocket服务器...
本文原题"Node.js - 200 多行代码实现 Websocket 协议",为了提升内容品质,有较大修订. 1.引言 最近正在研究 WebSocket 相关的知识,想着如何能自 ...
- SpringBoot+Vue整合WebSocket实现前后端消息推送
场景 WebSocket HTTP 协议是一种无状态的.无连接的.单向的应用层协议.它采用了请求/响应模型.通信请求只能由客户端发起,服务端对请求做出应答处理. 这种通信模型有一个弊端:HTTP 协议 ...
- netty系列之:使用netty搭建websocket客户端
文章目录 简介 浏览器客户端 netty对websocket客户端的支持 WebSocketClientHandshaker WebSocketClientCompressionHandler net ...
- netty系列之:使用netty搭建websocket服务器
文章目录 简介 netty中的websocket websocket的版本 FrameDecoder和FrameEncoder WebSocketServerHandshaker WebSocketF ...
最新文章
- Spring Boot 2.0(五):Docker Compose + Spring Boot + Nginx + Mysql 实践
- 透过国外初创公司看高精度地图难题
- 蚂蚁金服安全应急响应中心上线 用户可提交漏洞
- python编程下载安卓版-python编程入门
- python教程:循环(while和for)
- python 跳出for循环_python中如何退出for循环
- 2017-11-17 为Python添加中文关键字 1
- Echart极坐标间隔显示-单类目轴
- ElasticSearch6.8.1集群搭建及Java客户端编写
- 查找算法-------插值查找
- SM2椭圆曲线公钥加密/解密算法
- iOS popViewController 失败
- 致Oracle开发者的学习资源清单
- [資源]RAID是什么意思?RAID的应用
- 爱普生AR眼镜应用场景图鉴:八个领域案例都在这里
- 大数据发展前景及就业方向【大数据专业讲座】
- Noise-contrastive Estimation(NCE)学习
- echarts画出特殊形状的柱状图
- 简单聊聊FPGA的一些参数——后篇
- Linux基础命令---find