NIO 通讯 netty 实现的socket 通讯

应用netty 实现异步通信.实现消息推送和反馈,广播的功能

netty:port: 8888    #监听端口bossThread: 2 #线程数workerThread: 2 #线程数keepalive: true #保持连接backlog: 100
     <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>5.0.0.Alpha1</version></dependency>//配置参数类@Data@Component@ConfigurationProperties(prefix = "netty")public class NettyAcctConfig {private int port;private int bossThread;private int workerThread;private boolean keepalive;private int backlog;}// 启动协助类@Componentpublic class TCPServer {@Autowired@Qualifier("serverBootstrap")private ServerBootstrap serverBootstrap;@Autowired@Qualifier("tcpSocketAddress")private InetSocketAddress tcpPort;private Channel serverChannel;public void start() throws Exception {System.out.println("-------------------------###########-------------------------------------"+tcpPort);serverChannel =  serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel();}@PreDestroypublic void stop() throws Exception {serverChannel.close();serverChannel.parent().close();}}//创建 程序的启动入口@Componentpublic class WebSocketService {@Autowiredprivate NettyAcctConfig nettyAccountConfig;@Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")public NioEventLoopGroup bossGroup(){return new NioEventLoopGroup(nettyAccountConfig.getBossThread());}@Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")public NioEventLoopGroup workerGroup(){return new NioEventLoopGroup(nettyAccountConfig.getWorkerThread());}@Bean(name = "tcpSocketAddress")public InetSocketAddress tcpPost(){return new InetSocketAddress(nettyAccountConfig.getPort());}@Bean(name = "tcpChannelOptions")public Map<ChannelOption<?>, Object> tcpChannelOptions(){Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();options.put(ChannelOption.SO_KEEPALIVE, nettyAccountConfig.isKeepalive());options.put(ChannelOption.SO_BACKLOG, nettyAccountConfig.getBacklog());return options;}@Autowired@Qualifier("selfChannelInitializer")private WebSocketChannelHandler webSocketChannelHandler;@Bean(name = "serverBootstrap")public ServerBootstrap bootstrap(){ServerBootstrap b = new ServerBootstrap();b.group(bossGroup(), workerGroup()).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(webSocketChannelHandler); //配置 请求协议和处理 handlerMap<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {b.option(option, tcpChannelOptions.get(option));}return b;}}//配置 请求协议和处理 handler@Component@Qualifier("selfChannelInitializer")public class WebSocketChannelHandler extends ChannelInitializer<SocketChannel> {@Autowiredprivate WebSocketHandler webSocketHandler;@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast("http-codec",new HttpServerCodec());socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(65535));socketChannel.pipeline().addLast("http-chunked",new ChunkedWriteHandler());socketChannel.pipeline().addLast("handler",webSocketHandler);  //这里不能使用new,不然在handler中不能注入依赖socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));//如果需要拦截 第一次链接 就需要放在末尾}}// 客户请求业务逻辑@Data@Component@Qualifier("webSocketHandler")@ChannelHandler.Sharablepublic class WebSocketHandler extends SimpleChannelInboundHandler<Object>{private WebSocketServerHandshaker handshaker;private static ConcurrentHashMap<String,Map<String,Object>> userChannelMap  = new ConcurrentHashMap<String,Map<String,Object>>();@Autowired@Qualifier("tcpSocketAddress")private InetSocketAddress tcpPort;private  static  final String WEB_SOCKET_URL="ws://localhost:8888/websocket";private ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);//服务端处理客户端的核心方法@Overrideprotected void messageReceived(ChannelHandlerContext ctx, Object o) throws Exception {//处理客户端向服务端发起http握手请求if(o instanceof FullHttpRequest){handHttpRequest(ctx,(FullHttpRequest) o);}else if(o instanceof WebSocketFrame ){ //处理websocket连接业务handlerWebsocketFrame(ctx, (WebSocketFrame) o);}}private  void handlerWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){//判断是否未关闭websocket的请求if(frame instanceof CloseWebSocketFrame){handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());}//判断是否为ping消息if(frame instanceof PingWebSocketFrame){ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));return;}//判断是否为二进制消息if(!(frame instanceof TextWebSocketFrame)){System.out.println("目前不支持二进制消息");throw new RuntimeException("["+this.getClass().getName()+"] 不支持消息");}//返回应答消息//获取客户端向服务端发送的消息String request = ((TextWebSocketFrame) frame).text();//获取用户信息String userId = "";for(String key : userChannelMap.keySet()){Map channelMap  =  userChannelMap.get(key);if(ctx.channel().id().equals(channelMap.get("channelId"))){userId = key;}}TextWebSocketFrame tws=new TextWebSocketFrame(sendMsgObjStr(userId,request));//所有连接上的channel 都发送消息channels.writeAndFlush(tws);}/*** 功能描述: <br>* 〈处理客户端向服务端发起的握手请求的〉** @return:* @since: 1.0.0* @Author:JG* @Date: 2019/9/19 21:12*/private  void  handHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){//String uri = req.getUri();//连接时配置当前用户  this.websocket = new WebSocket('ws://localhost:8888/websocket?name='+ this.userName)String userId = null;if (null != uri && uri.contains("/websocket") && uri.contains("?")) {String[] uriArray = uri.split("\\?");if (null != uriArray && uriArray.length > 1) {String[] paramsArray = uriArray[1].split("=");if (null != paramsArray && paramsArray.length > 1) {userId = paramsArray[1];Map<String,Object> channelMap =  new HashMap<>();channelMap.put("channelId",ctx.channel().id());channelMap.put("channelObj",ctx.channel());userChannelMap.put(userId,channelMap);}}}if(!req.getDecoderResult().isSuccess() || !("websocket".equals(req.headers().get("upgrade")))){sendHttpResponse(ctx,req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}//http  websocket 握手请求处理逻辑String url="ws://"+tcpPort+"/websocket";WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(url,null,false);handshaker=wsFactory.newHandshaker(req);if(handshaker==null){WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());}else{handshaker.handshake(ctx.channel(),req);}//添加channelchannels.add(ctx.channel());channels.writeAndFlush(new TextWebSocketFrame(sendMsgObjStr(userId,"加入")));}/*** 功能描述: <br>* 〈处理 服务端向客户端的响应信息〉*[ctx, request]* @return:void* @since: 1.0.0* @Author:JG* @Date: 2019/9/19 21:15*/private  void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse resp){if(resp.getStatus().code()!=200){ByteBuf buffer = Unpooled.copiedBuffer(resp.getStatus().toString(), CharsetUtil.UTF_8);resp.content().writeBytes(buffer);buffer.release();}//服务端向客户端发送数据ChannelFuture future=ctx.channel().writeAndFlush(resp);if(resp.getStatus().code()!=200){future.addListener(ChannelFutureListener.CLOSE);}}//出现异常时调用@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();//关闭连接ctx.close();}//服务端读取信息结束时调用@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}public String sendMsgObjStr(String userId,String message){Gson g = new Gson();SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:MM:ss");Map<String,Object> messageMap = new HashMap();messageMap.put("userId",userId);messageMap.put("message",message);messageMap.put("createTime",dateFormat.format(new Date()));return   g.toJson(messageMap);}//客户端与服务端创建连接时调用/*@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {channels.add(ctx.channel())channels.writeAndFlush(new TextWebSocketFrame("[服务器] - "+ctx.channel().remoteAddress()+"加入"));}*///客户端与服务端断开@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {channels.remove(ctx.channel());String userId = "";for(String key : userChannelMap.keySet()){Map channelMap  =  userChannelMap.get(key);if(ctx.channel().id().equals(channelMap.get("channelId"))){userId = key;}}userChannelMap.remove(userId);channels.writeAndFlush(new TextWebSocketFrame(sendMsgObjStr(userId,"下线")));}public void sendAllMessage(String message) {channels.writeAndFlush(new TextWebSocketFrame(sendMsgObjStr("System",message)));}// 此为单点消息public void sendOneMessage(String userId, String message) {for(String key : userChannelMap.keySet()){if(key.equals(userId)){Map channelMap  =  userChannelMap.get(key);Channel channel = (Channel) channelMap.get("channelObj");channel.writeAndFlush(new TextWebSocketFrame(sendMsgObjStr("System",message)));}}}}//前台 <script type="text/javascript">var socket;if(!window.WebSocket){window.WebSocket=window.MozWebSocket;}if(window.WebSocket){socket = new WebSocket("ws://localhost:8888/websocket");socket.onmessage=function (e) {var tag= document.getElementById("responseContext");tag.value += e.data + "\r\n";};socket.onopen =function (p1) {var tag= document.getElementById("responseContext");tag.value =" 连接已经建立 请进行后续操作 \r\n";}socket.onclose= function (p1) {var tag= document.getElementById("responseContext");tag.value = "";tag.value= "连接已经关闭"}}else{alert("你的浏览器不支持websocket")}function  send (messgae) {if(!window.WebSocket){return}if(socket.readyState == WebSocket.OPEN){socket.send(messgae);}else{alert("websocket 连接未建立")}}</script>

SpringBoot websocket

STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的简单文本协议WebSocket是一个消息架构,不强制使用任何特定的消息协议,它依赖于应用层解释消息的含义;与处在应用层的HTTP不同,WebSocket处在TCP上非常薄的一层,会将字节流转换为文本/二进制消息,因此,对于实际应用来说,WebSocket的通信形式层级过低,因此,可以在 WebSocket 之上使用 STOMP协议,来为浏览器 和 server间的 通信增加适当的消息语义。如何理解 STOMP 与 WebSocket 的关系:
1) HTTP协议解决了 web 浏览器发起请求以及 web 服务器响应请求的细节,假设 HTTP 协议 并不存在,只能使用 TCP 套接字来 编写 web 应用,你可能认为这是一件疯狂的事情;
2) 直接使用 WebSocket(SockJS) 就很类似于 使用 TCP 套接字来编写 web 应用,因为没有高层协议,就需要我们定义应用间所发送消息的语义,还需要确保连接的两端都能遵循这些语义;
3) 同 HTTP 在 TCP 套接字上添加请求-响应模型层一样,STOMP 在 WebSocket 之上提供了一个基于帧的线路格式层,用来定义消息语义;

STOMP 实现socket 通讯

核心类: WebSocketMessageBrokerConfigurer 实现类
@EnableWebSocketMessageBroker
@Configuration表示使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping 来接受stomp的send请求主要实现:实现  WebSocketMessageBrokerConfigurer 接口, 重写 registerStompEndpoints 通过 StompEndpointRegistry 注册连接地址,注册HandshakeInterceptor 实现类,实现在握手前后的操作。 注册DefaultHandshakeHandler 实现类 ,用于配置用户名称或者一些权限操作重写 configureMessageBroker  配置代理目的地的前缀为 /topic 或者 /queue , /app 客户端发送前缀 即请求MessageMapping 的路径前缀。/user 点对点通讯时broke前缀,即配置心跳测试发布/订阅:Topic 可以重复消费,不管消费与否不会保存  广播点对点:Queue 不可以重复消费, 没人消费会保存重写 configureWebSocketTransport 设置 消息字节大小,缓存大小,发送超时时间重写 configureClientInboundChannel 设置输入消息通道 配置线程池  通过注册 ChannelInterceptor 实现类,实现 发送前和发送完毕的 一些核心操作,信息过滤 ,持久等。
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

WebSocketMessageBrokerConfigurer 实现类

/*** STOMP协议* EnableWebSocketMessageBroker 表示使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping*/
@Configuration
@EnableWebSocketMessageBroker
public class StompMessageBroke implements WebSocketMessageBrokerConfigurer {public void registerStompEndpoints(StompEndpointRegistry registry) {/*** 注册 Stomp的端点* addEndpoint:添加STOMP协议的端点。这个HTTP URL是供WebSocket或SockJS客户端访问的地址* withSockJS:指定端点使用SockJS协议* setHandshakeHandler 可以设置监控 socket运行和用户等*/registry.addEndpoint("/websocket-simple").setAllowedOrigins("*") // 添加允许跨域访问.addInterceptors(new MyHandShakeInterceptor()).setHandshakeHandler(new PrincipalHandshakeHandler()).withSockJS();}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {/*** 配置消息代理*  enableStompBrokerRelay 配置外部的STOMP服务 设置密码 代理host 默认为localhost  代理端口 默认为61613* 启动简单Broker* 可以配置多个,此段代码配置代理目的地的前缀为 /topic 或者 /queue 我们就可以给订阅配置的域的客户端推送消息*  固定到某个用户是用/queue 的前缀 前端订阅 /user/queue 的前缀*  发布/订阅:Topic 可以重复消费,不管消费与否不会保存  广播*  点对点:Queue 不可以重复消费, 没人消费会保存*/// 自定义调度器,用于控制心跳线程ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();taskScheduler.setPoolSize(1);taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-");taskScheduler.initialize();registry.enableSimpleBroker("/topic", "/queue").setHeartbeatValue(new long[]{10000,10000}).setTaskScheduler(taskScheduler);//客户端发送前缀  都会路由到带有@MessageMapping 注解的方法中registry.setApplicationDestinationPrefixes("/app");// 配置一对一消息前缀 默认也是/user/registry.setUserDestinationPrefix("/user");}//配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间@Overridepublic void configureWebSocketTransport(WebSocketTransportRegistration registration) {registration.setMessageSizeLimit(500 * 1024 * 1024);registration.setSendBufferSizeLimit(1024 * 1024 * 1024);registration.setSendTimeLimit(200000);}/*** 设置输入消息通道 配置线程池* 配置消息拦截器 实现ChannelInterceptor接口* @param registration*/@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {registration.taskExecutor().corePoolSize(5).maxPoolSize(10).keepAliveSeconds(30);registration.interceptors(new InMessageIntercepter());}/*** 设置输出消息通道* @param registration*/@Overridepublic void configureClientOutboundChannel(ChannelRegistration registration) {registration.taskExecutor().corePoolSize(5).maxPoolSize(10).keepAliveSeconds(30);registration.interceptors(new OutMessageIntercepter());}}

DefaultHandshakeHandler 实现类


public class PrincipalHandshakeHandler extends DefaultHandshakeHandler {@Overrideprotected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {/*** 这边可以按你的需求,如何获取唯一的值,既unicode* 得到的值,会在监听处理连接的属性中,既WebSocketSession.getPrincipal().getName()* 也可以自己实现Principal()*/if (request instanceof ServletServerHttpRequest) {ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request;HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest();/*** 这边就获取你最熟悉的陌生人,携带参数,你可以cookie,请求头,或者url携带,这边我采用url携带*/final String userId = httpRequest.getParameter("userId");if (StringUtils.isEmpty(userId)) {return null;}return new Principal() {@Overridepublic String getName() {return userId;}};}return null;}
}

HandshakeInterceptor 实现类

//OriginHandshakeInterceptor:检查Origin头字段的合法性public class MyHandShakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {System.out.println(this.getClass().getCanonicalName() + "http协议转换websoket协议进行前, 握手前"+serverHttpRequest.getURI());return true;}@Overridepublic void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {System.out.println(this.getClass().getCanonicalName() + "握手成功后...");}
}

ChannelInterceptor 实现类

/*** 客户端发送消息处理*/
public class InMessageIntercepter implements ChannelInterceptor {@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {//发送前消息过滤//离开进入操作Object payload = message.getPayload();MessageHeaders headers = message.getHeaders();SimpMessageType simpMessageType  = (SimpMessageType)headers.get("simpMessageType");String simpleSessionId = (String) headers.get("simpSessionId");if("CONNECT".equals(simpMessageType.name())){}if("DISCONNECT".equals(simpMessageType.name())){}if("SEND".equals(simpMessageType.name())){System.out.println(simpMessageType.name()+"==in==");}if(headers.containsKey("stompCommand")){StompCommand stompCommand = (StompCommand) headers.get("stompCommand");}return message;}@Overridepublic void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {//发送成功后进行 数据存储 过滤心跳Object payload = message.getPayload();MessageHeaders headers = message.getHeaders();}
}/***  服务端发送消息处理*/
public class OutMessageIntercepter implements ChannelInterceptor {@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {//发送前消息过滤Object payload = message.getPayload();MessageHeaders headers = message.getHeaders();SimpMessageType simpMessageType  = (SimpMessageType)headers.get("simpMessageType");String simpleSessionId = (String) headers.get("simpSessionId");if("MESSAGE".equals(simpMessageType.name())){System.out.println(simpMessageType.name()+"==out==");}return message;}@Overridepublic void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {Object payload = message.getPayload();MessageHeaders headers = message.getHeaders();if(sent){SimpMessageType simpMessageType  = (SimpMessageType)headers.get("simpMessageType");String simpleSessionId = (String) headers.get("simpSessionId");if("MESSAGE".equals(simpMessageType.name())){//发送成功后进行 数据存储 过滤心跳}}}}

controller

@Controller
public class BroadcastCtl {// 收到消息记数private AtomicInteger count = new AtomicInteger(0);@Autowiredprivate SimpMessagingTemplate messagingTemplate;/*** @MessageMapping 指定要接收消息的地址,类似@RequestMapping。除了注解到方法上,也可以注解到类上* @SendTo默认 消息将被发送到与传入消息相同的目的地* 消息的返回值是通过{@link org.springframework.messaging.converter.MessageConverter}进行转换* @param requestMessage* @return*/@MessageMapping({"/receive"})@SendTo("/topic/getResponse")//@SendToUser("/topic/getResponse")//前端推送的地址  send 调用的地址,发送到 /topic/getResponse   然后订阅 /topic/getResponse 用户就会收到 广播public ResponseMessage broadcast(RequestMessage requestMessage){ResponseMessage responseMessage = new ResponseMessage();responseMessage.setContent("BroadcastCtl receive [" + count.incrementAndGet() + "] records");return responseMessage;}@CrossOrigin@MessageMapping({"/chatRoom"})  //前端推送的 地址 推送到指定的位置 room 进行广播public void messageHandling(RequestMessage requestMessage) throws Exception {String destination = "/topic/" + HtmlUtils.htmlEscape(requestMessage.getRoom());String sender = HtmlUtils.htmlEscape(requestMessage.getSender());  //htmlEscape  转换为HTML转义字符表示String type = HtmlUtils.htmlEscape(requestMessage.getType());String content = HtmlUtils.htmlEscape(requestMessage.getContent());ResponseMessage response = new ResponseMessage(sender, type, content,new Date());messagingTemplate.convertAndSend(destination, response);}@CrossOrigin@MessageMapping({"/toUser"})  //前端推送的 地址  针对某人推送public void singleToUser(RequestMessage requestMessage) throws Exception {String sender = HtmlUtils.htmlEscape(requestMessage.getSender());  //htmlEscape  转换为HTML转义字符表示String type = HtmlUtils.htmlEscape(requestMessage.getType());String content = HtmlUtils.htmlEscape(requestMessage.getContent());ResponseMessage response = new ResponseMessage(sender, type, content,new Date());// 点对点messagingTemplate.convertAndSendToUser(requestMessage.getReceiver(), "/queue/toUser",response);}@GetMapping("socketStomp")public ModelAndView page3(){return new ModelAndView("socketStomp");}@GetMapping("testSendToAll")public void sendQueueMessage() {System.out.println("后台群发推送推送!");ResponseMessage response = new ResponseMessage("admin", "text", "后台群发推送推送!",new Date());messagingTemplate.convertAndSend("/topic/getResponse",response);}
}

实体类 消息请求 和 返回实体

ublic class RequestMessage implements Serializable {private static final long serialVersionUID = -141796491940503606L;private Long  messageId;private String sender;//消息发送者private String room;//房间号private String type;//消息类型private String content;//消息内容private String receiver;//消息接受者private Date sendTime;public Date getSendTime() {return sendTime;}public void setSendTime(Date sendTime) {this.sendTime = sendTime;}public RequestMessage() {}public RequestMessage(String sender,String receiver,String room, String type, String content) {this.sender = sender;this.receiver = receiver;this.room = room;this.type = type;this.content = content;}public String getSender() {return sender;}public String getRoom() {return room;}public String getType() {return type;}public String getContent() {return content;}public String getReceiver() {return receiver;}public Long getMessageId() {return messageId;}public void setMessageId(Long messageId) {this.messageId = messageId;}public void setSender(String sender) {this.sender = sender;}public void setRoom(String room) {this.room = room;}public void setType(String type) {this.type = type;}public void setContent(String content) {this.content = content;}public void setReceiver(String receiver) {this.receiver = receiver;}}public class ResponseMessage implements Serializable {private static final long serialVersionUID = -141964919405031606L;private Long  messageId;private String sender;private String type;private String content;private Date sendTime;public Date getSendTime() {return sendTime;}public void setSendTime(Date sendTime) {this.sendTime = sendTime;}public ResponseMessage() {}public ResponseMessage(String sender, String type, String content,Date sendTime) {this.sender = sender;this.type = type;this.sendTime =sendTime;this.content = content;}public String getSender() {return sender;}public String getType() {return type;}public String getContent() {return content;}public Long getMessageId() {return messageId;}public void setMessageId(Long messageId) {this.messageId = messageId;}public void setSender(String sender) {this.sender = sender;}public void setType(String type) {this.type = type;}public void setContent(String content) {this.content = content;}}
stomp 前端操作
创建连接 订阅 发送
![在这里插入图片描述](https://img-blog.csdnimg.cn/f7287e9e9e9041378aec3feb8b90488d.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBA6I-c5LiN5piv6ZSZ,size_20,color_FFFFFF,t_70,g_se,x_16)
<!DOCTYPE html><html><head><meta charset="utf-8"><title>SOCKET</title><!-- VUE --><script src="https://cdn.jsdelivr.net/npm/vue/dist/vue.js"></script><!-- FormMaking --><link rel="stylesheet" href="https://unpkg.com/element-ui/lib/theme-chalk/index.css"><!-- 引入样式 --><link rel="stylesheet" href="https://unpkg.com/element-ui/lib/theme-chalk/index.css"><!-- 引入组件库 --><script src="https://unpkg.com/element-ui/lib/index.js"></script><script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script><script type="application/javascript" src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script><script type="application/javascript" src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script></head><body><div id="vue_det" style="margin:20px;"><el-container><el-header><h1>HELLOW SOCKET</h1></el-header><el-main><div style="background-color: #b3e19d;height: 600px;overflow-y:auto"><div v-for="item,index in messagList"  style="height: 50px;" :class="item.self?'textSelf':'textOther'"> {{item.showMessage}} </div></div></el-main><el-footer><el-form :inline="true"  style="margin-top: 10px" ><el-form-item label="当前人" prop="userId"><el-input type="input" v-model="userId"  :disabled="true"></el-input></el-form-item><el-form-item label="输入信息" prop="text"><el-input type="textarea" v-model="text"></el-input></el-form-item><el-form-item label="接收人" prop="receiver"><el-input type="input" v-model="receiver"></el-input></el-form-item><el-form-item><el-button type="primary" @click="send">发送</el-button></el-form-item></el-form></el-footer></el-container></div>
</body><script type="text/javascript">var vm = new Vue({el: '#vue_det',data: {userId:"",lockReconnect:false, //防止重复连接text:"",receiver:"",messagList:[]},mounted() {//测试 用户名 随机var arr= ["a","b","c","d"];var  name = "";for(var x =0 ; x <4;x++){var key = Math.floor(Math.random()*10 /3)name += arr[key];}this.userId = name;// wss 为请求 https 安全协议//非安全的var url = 'http://localhost:28087/websocket-simple?userId='+ this.userIdthis.createWebSocket(url);},methods: {createWebSocket(wsUrl) {var that = this;console.log(wsUrl);if(typeof(WebSocket) == "undefined") {console.log("您的浏览器不支持WebSocket");}else {var socket1 = new SockJS(wsUrl);that.stomp = Stomp.over(socket1);//连接that.stomp.connect({}, function (frame) {//订阅主题that.stomp.subscribe("/topic/getResponse", function (res) {console.log(res.body);that.msgRemake(res.body)});//群发订阅that.stomp.subscribe("/queue/getResponse", function (res) {console.log(res.body);that.msgRemake(res.body)});//用户模式that.stomp.subscribe("/user/queue/toUser", function (res) {console.log(res.body);that.msgRemake(res.body)});});}},msgRemake(data){var obj = JSON.parse(data)obj.showMessage  = obj.sender +"  "+ obj.sendTime +"\r\n"+obj.content;if(obj.sender === this.userId){obj.self = true;}else{obj.self = false;const h = this.$createElement;this.$notify({title: '标题名称',message: h('i', { style: 'color: teal'}, obj.showMessage)});}this.messagList.push(obj);},send(){var msg = {"sender":this.userId,"receiver":this.receiver,"room" :"","type" :"text","content":this.text,"sendTime":new Date()};this.msgRemake(JSON.stringify(msg)); //增加到屏幕this.stomp.send("/app/toUser", {}, JSON.stringify(msg));},close(){this.stomp.disconnect();}},beforeDestroy(){this.close();}})</script><style scoped>.el-header {background-color: #B3C0D1;color: #333;text-align: center;height:180px;}.el-main {padding: 0;}.el-footer {background-color: #B3C0D1;color: #333;line-height: 30px;}.el-carousel__item h3 {color: #475669;font-size: 14px;opacity: 0.75;line-height: 200px;margin: 0;}.el-carousel__item:nth-child(2n) {background-color: #99a9bf;}.el-carousel__item:nth-child(2n+1) {background-color: #d3dce6;}.textOther {text-align: left;background-color: #00a0e9;border: solid 1px black}.textSelf {text-align: right;background-color: #BAC498;border: solid 1px black}</style></html>

WebSocket 实现socket 通讯

核心类 实现 WebSocketConfigurer
@EnableWebSocket
@Configuration
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {/*** WebSocketHandlerRegistry 方式注册不用的路径不用的 AbstractWebSocketHandler 实现类* @param registry*/@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(new WebSocketAuthHandler(), "socketJs").addInterceptors(new MyHandShakeInterceptor()) //三次握手.setAllowedOrigins("*").withSockJS();}/***  配置 ServerEndpointExporter 就可以 用@ServerEndpoint 来注册一个服务类.*     @ServerEndpoint("/imserver/{userId}")*     @OnOpen*     @OnClose** @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}/**雪花算法生成id  业务使用的我临时放在这里来注入容器 **/@Beanpublic IdWorker idWorker(){return new IdWorker(1,1,1);}
}
方式1 启动 ServerEndpointExporter 通过  @ServerEndpoint("/imserver/{userId}") 发布 websocket

@ServerEndpoint("/imserver/{userId}")
public class SocketEndPointService {@Autowiredprivate IdWorker idWorker;//针对独立的会话private static ConcurrentHashMap<String,SocketEndPointService> singleSessions = new ConcurrentHashMap<>();//针对房间群里session管理private static ConcurrentHashMap<String, Set<SocketEndPointService>> roomSessions = new ConcurrentHashMap<>();/**与某个客户端的连接会话,需要通过它来给客户端发送数据*/private Session session;/**接收userId*/private String userId="";/**接收userId*/private String roomId="";/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam("userId") String userId, @RequestParam("roomId") String roomId) {this.session = session;this.userId=userId;this.roomId=roomId;if(StringUtils.isEmpty(roomId)){singleSessions.put(userId,this);}else{if(roomSessions.contains(roomId)){Set<SocketEndPointService> webSocketServers = roomSessions.get(roomId);webSocketServers.add(this);}}}/*** 连接关闭调用的方法*/@OnClosepublic void onClose() {if(StringUtils.isEmpty(roomId)) {Set<SocketEndPointService> webSocketServers = roomSessions.get(roomId);webSocketServers.remove(this);}else{singleSessions.remove(this.userId);}}/*** 收到客户端消息后调用的方法** @param message 客户端发送过来的消息*/@OnMessagepublic void onMessage(String message, Session session) {RequestMessage requestMessage = JSONUtil.toBean(message,RequestMessage.class);if(requestMessage.getMessageId() == null){requestMessage.setMessageId(idWorker.nextId());}ResponseMessage responseMessage = new ResponseMessage();BeanUtil.copyProperties(responseMessage,requestMessage);responseMessage.setSendTime(new Date());if(StringUtils.isEmpty(roomId)) {Set<SocketEndPointService> webSocketServers = roomSessions.get(roomId);//给所有该房间的人发消息,排出自己,并for(SocketEndPointService webSocketServer: webSocketServers){if(webSocketServer.equals(this)) continue;webSocketServer.session.getAsyncRemote().sendText(JSONUtil.toJsonStr(responseMessage));}}else{//个人发送String receiverId = requestMessage.getReceiver();if(singleSessions.containsKey(receiverId))singleSessions.get(receiverId).session.getAsyncRemote().sendText(JSONUtil.toJsonStr(responseMessage));}}/**** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {error.printStackTrace();}/*** 实现服务器主动推送*/public void sendMessage(String message) throws IOException {ResponseMessage responseMessage = new ResponseMessage();responseMessage.setSender("admin");responseMessage.setSendTime(new Date());responseMessage.setContent(message);responseMessage.setType("application/text");responseMessage.setMessageId(idWorker.nextId());JSONObject j = new JSONObject(responseMessage);this.session.getBasicRemote().sendText(j.toString());}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;SocketEndPointService that = (SocketEndPointService) o;return Objects.equals(userId, that.userId) ;}@Overridepublic int hashCode() {return Objects.hash(session, userId, roomId);}
}
方式2 实现 WebSocketConfigurer  通过 WebSocketHandlerRegistry 注册 某个路径 和对应的 消息处理的  TextWebSocketHandler 的实现类
集成  TextWebSocketHandler  进行一些 建立连接 关闭连接 发送消息的操作

public class WebSocketAuthHandler extends TextWebSocketHandler {/*** socket 建立成功事件** @param session* @throws Exception*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {Object token = session.getAttributes().get("token");/* if (token != null) {// 用户连接成功,放入在线用户缓存// WsSessionManager.add(token.toString(), session);} else {throw new RuntimeException("用户登录已经失效!");}*/}/*** 接收消息事件** @param session* @param message* @throws Exception*/@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// 获得客户端传来的消息String payload = message.getPayload();Object token = session.getAttributes().get("token");System.out.println("server 接收到 " + token + " 发送的 " + payload);session.sendMessage(new TextMessage("server 发送给 " + token + " 消息 " + payload + " " + LocalDateTime.now().toString()));}/*** socket 断开连接时** @param session* @param status* @throws Exception*/@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {Object token = session.getAttributes().get("token");if (token != null) {// 用户退出,移除缓存// WsSessionManager.remove(token.toString());}}
}
websocket vue 前端连接
<!DOCTYPE html>
<html><head><meta charset="utf-8"><title>VUE  SINGLE</title><!-- VUE --><script src="https://cdn.jsdelivr.net/npm/vue/dist/vue.js"></script><!-- 引入样式 --><link rel="stylesheet" href="https://unpkg.com/element-ui/lib/theme-chalk/index.css"><!-- 引入组件库 --><script src="https://unpkg.com/element-ui/lib/index.js"></script><script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script><script type="application/javascript" src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script></head><body><div id="vue_det"><div style="background-color: #b3e19d;width: 1200px;height: 600px;overflow-y:auto"><div v-for="item,index in messagList"  style="width: 1200px;height: 50px;" :class="item.self?'textSelf':'textOther'"> {{item.showMessage}} </div></div><el-form :inline="true"><el-form-item label="输入信息" prop="desc"><el-input type="textarea" v-model="text"></el-input></el-form-item><el-form-item><el-button type="primary" @click="send">发送</el-button></el-form-item></el-form></div>
</body>
<script type="text/javascript">var vm = new Vue({el: '#vue_det',data(){return {userId:"",lockReconnect:false, //防止重复连接text:"",messagList:[]}},mounted () {//测试 用户名 随机var arr= ["a","b","c","d"];var  name = "";for(var x =0 ; x <4;x++){var key = Math.floor(Math.random()*10 /3)name += arr[key];}this.userId = name;// wss 为请求 https 安全协议//非安全的var url = 'http://localhost:28087/socketJs?name='+ this.userIdthis.createWebSocket(url);},beforeDestroy () {this.onbeforeunload()},methods: {createWebSocket(wsUrl) {try {if(typeof(WebSocket) == "undefined") {console.log("您的浏览器不支持WebSocket");}elsethis.websocket = new WebSocket(wsUrl);//初始化this.initWebSocket();} catch(e) {console.log('catch');//异常后重新连接this.reconnect();}},initWebSocket () {// 连接错误this.websocket.onerror = this.setErrorMessage// 连接成功this.websocket.onopen = this.setOnopenMessage// 收到消息的回调this.websocket.onmessage = this.setOnmessageMessage// 连接关闭的回调this.websocket.onclose = this.setOncloseMessage// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。window.onbeforeunload = this.onbeforeunload},setErrorMessage () {console.log('WebSocket连接发生错误   状态码:' + this.websocket.readyState)},setOnopenMessage () {console.log('WebSocket连接成功    状态码:' + this.websocket.readyState)//心跳检测重置//this.heartCheck().start();},setOnmessageMessage (event) {// 根据服务器推送的消息做自己的业务处理   {}var obj = JSON.parse( event.data)obj.showMessage  = obj.userId +"  "+ obj.createTime +"\r\n"+obj.message;if(obj.userId === this.userId){obj.self = true;}else{obj.self = false;}this.messagList.push(obj);console.log(obj.showMessage);const h = this.$createElement;this.$notify({title: '标题名称',message: h('i', { style: 'color: teal'}, obj.showMessage)});},setOncloseMessage () {console.log('WebSocket连接关闭    状态码:' + this.websocket.readyState)},onbeforeunload () {this.closeWebSocket()},closeWebSocket () {this.websocket.close()},send(){this.websocket.send(this.text);},reconnect() {if(this.lockReconnect) {return;};this.lockReconnect = true;//没连接上会一直重连,设置延迟避免请求过多tt && clearTimeout(tt);tt = setTimeout(function () {createWebSocket();this.lockReconnect = false;}, 4000);},//心跳检测heartCheck(){var heartCheck = {timeout: 210000,timeoutObj: null,serverTimeoutObj: null,start: function(){var self = this;this.timeoutObj && clearTimeout(this.timeoutObj);this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);this.timeoutObj = setTimeout(function(){//这里发送一个心跳,后端收到后,返回一个心跳消息,//onmessage拿到返回的心跳就说明连接正常console.log(getNowTime() +' Socket 连接重试');//socket.send("连接成功");self.serverTimeoutObj = setTimeout(function() {}, self.timeout);}, this.timeout)}}return  heartCheck}}});
</script>
<style scoped="">.textOther {text-align: left;background-color: #00a0e9;border: solid 1px black}.textSelf {text-align: right;background-color: #BAC498;border: solid 1px black}
</style>
</html>

WebSocket Stomp 通讯相关推荐

  1. Spring websocket+Stomp+SockJS 实现实时通信 详解

    Spring websocket+Stomp+SockJS 实时通信详解 一.三者之间的关系 Http连接为一次请求(request)一次响应(response),必须为同步调用方式.WebSocke ...

  2. java SSM 框架 多数据源 代码生成器 websocket即时通讯 shiro redis 后台框架源码

    A 调用摄像头拍照,自定义裁剪编辑头像,头像图片色度调节 B 集成代码生成器 [正反双向](单表.主表.明细表.树形表,快速开发利器)+快速表单构建器 freemaker模版技术 ,0个代码不用写,生 ...

  3. 微信小程序使用GoEasy实现websocket实时通讯

    不需要下载安装,便可以在微信好友.微信群之间快速的转发,用户只需要扫码或者在微信里点击,就可以立即运行,有着近似APP的用户体验,使得微信小程序成为全民热爱的好东西~ 同时因为微信小程序使用的是Jav ...

  4. vue+websocket+Stomp组件实现前端长连接

    1文件结构 2.重点文件夹中的文件代码以及作用 ① 根目录中systemConfig文件夹中的main.js文件 /***作用:作为项目中的配置文件,放了websocket连接的url,用户名(adm ...

  5. GPS转换百度地图坐标websocket(stomp)实现动态打点

    序        这会已经下班乐,本来觉得这个好像没有什么新东西.最后决定还是趁现在下班写写是因为要让大家看看单技术组合使用的意义.这里就是在上次springBoot+thymeleaf+layui后 ...

  6. java代码编辑器 pdf文件预览 主流SSM 代码生成器 shrio redis websocket即时通讯

    获取[下载地址] QQ: 313596790 官网 http://www.fhadmin.org/ A 代码编辑器,在线模版编辑,仿开发工具编辑器,pdf在线预览,文件转换编码 B 集成代码生成器 [ ...

  7. 代码生成器 websocket即时通讯 shiro redis 后台框架源码

    获取[下载地址]   QQ: 313596790 A 调用摄像头拍照,自定义裁剪编辑头像,头像图片色度调节 B 集成代码生成器 [正反双向](单表.主表.明细表.树形表,快速开发利器)+快速表单构建器 ...

  8. php实现websocket即时通讯

    php实现websocket即时通讯 系统环境 环境配置 概述 websocket原理 php服务端 web客户端 开启Socket服务器程序 系统环境 centos 7 php5.6 环境配置 ph ...

  9. channels实现websocket实时通讯和消息推送

    Django+channels实现websocket实时通讯@channels Django框架集合channels实现实时通讯和消息推送 channel是Django团队的一个研发的一个给Djang ...

最新文章

  1. XenDesktop vDisk更新
  2. Apache,php,mysql整合安装包 for Windows 2000/xp/2003
  3. An error occurred. connect() failed (111: Connection refused) while connecting to upstream
  4. 机器学习、深度学习、强化学习课程超级大列表!
  5. 深入理解InnoDB(8)—单表访问
  6. SAS在金融中的应用六
  7. java final对象_java面向对象基础_final详细介绍
  8. 迁移至Kubernetes的三种主要方式对比
  9. 在cygwin下使用VC编译器
  10. matlab读取2级文件夹,并把图像保存到指定的文件夹
  11. Visual SourceSafe 2005(Vss2005)使用经验总结
  12. VS 2017 透明背景设置
  13. 【GIT】git 提交代码正确步骤
  14. java 正则表达式 compile_JAVA 正则表达式
  15. php大马程序_php大马是什么
  16. HDP直播个性化设置教程,简单几步提升电视盒子/投影仪观影体验
  17. OOP思想--封装和继承
  18. 局域网上网流量监控_NAT下网络流量监控解决方案
  19. 平台型时间信号强度曲线_MR动态增强扫描时间-信号强度曲线在骨骼肌肉系统肿瘤定性诊断中的价值...
  20. 站内SEO第三篇:降低网站跳出率的7种有效方法

热门文章

  1. 排序算法-简单选择排序
  2. 我相信心能到达的地方,脚步也可以踏至
  3. C# menuStrip 配置
  4. 141:vue+openlayers 测量长度和面积,尾随数字和关闭按钮
  5. 7个常用的网络性能测试指标
  6. Linux0.11操作系统(哈工大李治军老师)实验楼实验2-系统调用
  7. 图像分割中的一些术语,pixel-wise,patch-wise,image-wise
  8. win7电脑一开机就蓝屏怎么回事?常见蓝屏代码
  9. 转:《伤寒玩家攻关秘籍手册》·雪山来客学习伤寒论的体会与临床经验
  10. 大数据剖析:离开互联网大厂的年轻人都去了哪里?