Netty-SocketIO 集群解决方案
Netty-SocketIO 集群解决方案
Netty-SocketIO
作为一个Socket框架,使用非常方便,并且使用Netty开发
性能也有保证。但是,我在使用
Netty-SocketIO
框架时,却发现,国内的资料比较少,虽然有些Demo级别的技术分享,但是关于集群解决方案,并没有什么较好的解决方法。
文章目录
- Netty-SocketIO 集群解决方案
- `Netty-SocketIO`作为一个Socket框架,使用非常方便,并且使用`Netty开发`性能也有保证。
- 但是,我在使用`Netty-SocketIO`框架时,却发现,国内的资料比较少,虽然有些Demo级别的技术分享,但是关于集群解决方案,并没有什么较好的解决方法。
- 所以,博主结合GitHub上的`Issues`,实现了一种集群的解决方案。
- 一. 解决方案原理
- 二.服务端
- 1.版本
- 2.项目结构
- 3.架构\原理图
- 3.代码
- com.lt.push.config
- ClientCache
- EventListenner
- MessagePushConfig
- RedisConfig
- RedisSub
- com.lt.push/controller
- PushMsgReq
- PushController
- entity
- PushMsgEntity
- util
- R
- LtPushApplication
- resources
- application.yml
- application-local.yml
- 三.客户端
- 四.测试
- 五.源码下载
- 小小赞助,谢谢!
所以,博主结合GitHub上的Issues
,实现了一种集群的解决方案。
一. 解决方案原理
使用Redis订阅\发布模式解决,实现多集群间通讯。
注:
1.官方好像推荐使用Redisson来解决集群问题,有兴趣的同学可以试试,我是没试出来。。
2.最新版Redis支持数据流存储,可通过缓存SocketIOClient
来实现,有兴趣的同学可以试试。
二.服务端
使用SpringBoot+Netty-SocketIO+Redis,可通过修改启动端口,模拟多服务端
1.版本
netty-socketio
: 1.7.11
spring-boot-
: 2.2.1.RELEASE
JDK
: 1.8
2.项目结构
3.架构\原理图
3.代码
com.lt.push.config
ClientCache
本地(数据中心)类
package com.lt.push.config;import com.corundumstudio.socketio.SocketIOClient;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;/*** @author litong* @date 2019/11/6 16:01*/
@Component
public class ClientCache {/*** 本地缓存*/private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap<>();/*** 存入本地缓存** @param userId 用户ID* @param sessionId 页面sessionID* @param socketIOClient 页面对应的通道连接信息*/public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) {HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId);if (sessionIdClientCache == null) {sessionIdClientCache = new HashMap<>();}sessionIdClientCache.put(sessionId, socketIOClient);concurrentHashMap.put(userId, sessionIdClientCache);}/*** 根据用户ID获取所有通道信息** @param userId* @return*/public HashMap<UUID, SocketIOClient> getUserClient(String userId) {return concurrentHashMap.get(userId);}/*** 根据用户ID及页面sessionID删除页面链接信息** @param userId* @param sessionId*/public void deleteSessionClient(String userId, UUID sessionId) {concurrentHashMap.get(userId).remove(sessionId);}
}
EventListenner
事件监听/注册中心
package com.lt.push.config;import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.UUID;/*** @author litong* @date 2019/11/6 15:59*/
@Component
public class EventListenner {@Resourceprivate ClientCache clientCache;/*** 客户端连接** @param client*/@OnConnectpublic void onConnect(SocketIOClient client) {String userId = client.getHandshakeData().getSingleUrlParam("userId");UUID sessionId = client.getSessionId();clientCache.saveClient(userId, sessionId, client);System.out.println("建立连接");}/*** 客户端断开** @param client*/@OnDisconnectpublic void onDisconnect(SocketIOClient client) {String userId = client.getHandshakeData().getSingleUrlParam("userId");clientCache.deleteSessionClient(userId, client.getSessionId());System.out.println("关闭连接");}//消息接收入口,当接收到消息后,查找发送目标客户端,并且向该客户端发送消息,且给自己发送消息// 暂未使用@OnEvent("messageevent")public void onEvent(SocketIOClient client, AckRequest request) {}
}
MessagePushConfig
Socket线程开启类
package com.lt.push.config;import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author litong* @date 2019/11/1 13:32*/
@Component
@Slf4j
public class MessagePushConfig implements InitializingBean {@Resourceprivate EventListenner eventListenner;@Value("${push.server.port}")private int serverPort;@Autowiredprivate SocketIOServer socketIOServer;@Overridepublic void afterPropertiesSet() throws Exception {socketIOServer.start();System.out.println("启动正常");}@Beanpublic SocketIOServer socketIOServer() {Configuration config = new Configuration();config.setPort(serverPort);SocketConfig socketConfig = new SocketConfig();socketConfig.setReuseAddress(true);socketConfig.setTcpNoDelay(true);socketConfig.setSoLinger(0);config.setSocketConfig(socketConfig);config.setHostname("localhost");SocketIOServer server = new SocketIOServer(config);server.addListeners(eventListenner);return server;}
}
RedisConfig
redis配置订阅\发布模式
package com.lt.push.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @author litong* @date 2019/10/17 14:33*/
@Configuration
public class RedisConfig {@Autowiredprivate RedisTemplate redisTemplate;@Beanpublic RedisTemplate<String, Object> stringSerializerRedisTemplate() {RedisSerializer<String> stringSerializer = new StringRedisSerializer();redisTemplate.setKeySerializer(stringSerializer);redisTemplate.setValueSerializer(stringSerializer);redisTemplate.setHashKeySerializer(stringSerializer);redisTemplate.setHashValueSerializer(stringSerializer);return redisTemplate;}@Bean(destroyMethod = "destroy")public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory,MessageListener redisMessageListener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);//可以添加多个 messageListenercontainer.addMessageListener(redisMessageListener, new PatternTopic("index"));return container;}
}
RedisSub
redis发布配置
package com.lt.push.config;import com.alibaba.fastjson.JSONObject;
import com.corundumstudio.socketio.SocketIOClient;
import com.lt.push.entity.PushMsgEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.UUID;/*** @author litong* @date 2019/10/17 15:33*/
@Service(value = "redisMessageListener")
@Slf4j
public class RedisSub implements MessageListener {@Resourceprivate ClientCache clientCache;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {String msgString = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());PushMsgEntity pushMsgEntity = JSONObject.parseObject(msgString, PushMsgEntity.class);String channel = (String) redisTemplate.getValueSerializer().deserialize(message.getChannel());if (!StringUtils.isEmpty(channel) && !StringUtils.isEmpty(pushMsgEntity)) {// 向客户端推送消息HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(pushMsgEntity.getUid());if (userClient != null && !userClient.isEmpty()) {userClient.forEach((uuid, socketIOClient) -> {socketIOClient.sendEvent("chatevent", pushMsgEntity.getMessage());});}}}
}
com.lt.push/controller
PushMsgReq
Req请求类
package com.lt.push.controller.request;import lombok.Data;/*** @author litong* @date 2019/11/13 16:44*/
@Data
public class PushMsgReq {private String uid;private String msg;
}
PushController
推送消息控制层
package com.lt.push.controller;import com.alibaba.fastjson.JSON;
import com.lt.push.controller.request.PushMsgReq;
import com.lt.push.entity.PushMsgEntity;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author litong* @date 2019/11/6 16:02*/
@RestController
@RequestMapping("/push")
@AllArgsConstructor
@Slf4j
public class PushController {private RedisTemplate redisTemplate;@PostMapping("/user")public String pushUser(@RequestBody PushMsgReq pushMsgReq) {// 接受消息,存储到MongoDB中,// 发布到redis中PushMsgEntity pushMsgEntity = PushMsgEntity.builder().uid(pushMsgReq.getUid()).message(pushMsgReq.getMsg()).build();redisTemplate.convertAndSend("index", JSON.toJSONString(pushMsgEntity));return "success";}
}
entity
PushMsgEntity
推送实体类
package com.lt.push.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author litong* @date 2019/11/12 16:24*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PushMsgEntity {private String uid;private String message;}
util
R
返回数据格式类
package com.lt.push.util;import lombok.Data;/*** @author litong* @date 2019/11/10 15:15*/
@Data
public class R<T> {private Integer code;private String msg;private T data;public static <T> R<T> ok() {return restResult(null, 1000, "成功");}public static <T> R<T> ok(T data) {return restResult(data, 1000, null);}private static <T> R<T> restResult(T data, int code, String msg) {R<T> apiResult = new R<>();apiResult.setCode(code);apiResult.setData(data);apiResult.setMsg(msg);return apiResult;}
}
LtPushApplication
启动类
package com.lt.push;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class LtPushApplication {public static void main(String[] args) {SpringApplication.run(LtPushApplication.class, args);}}
resources
配置文件
application.yml
spring:application:name: lt-pushprofiles:active: local
application-local.yml
server:port: 8081spring:redis:host: localhostport: 6379password:
# 推送服务、地址端口
push:server:host: localhostport: 8082
三.客户端
使用网页模拟,可自行修改客户端代码,模拟多用户
为了方便,socket.io.js,我直接使用的线上的,可自行下载后放到项目中
端口可根据实际进行修改,在控制台查看推送信息
<script src="https://cdn.bootcss.com/socket.io/2.1.1/socket.io.js"></script><script type="text/javascript">// 此处userId为用户标识,后面推送时使用var socket = io.connect('http://localhost:8082?userId=1', {'reconnection delay' : 2000,'force new connection' : true});socket.on('message', function(data) {// here is your handler on messages from serverconsole.log(data)});socket.on('chatevent', function(data) {// here is your handler on chatevent from serverconsole.log(data)});socket.on('connect', function() {// connection established, now we can send an objects// send json-object to server// '@class' property should be defined and should// equals to full class name.var obj = { '@class' : 'com.sample.SomeClass'}socket.json.send(obj);// send event-object to server// '@class' property is NOT necessary in this casevar event = {}socket.emit('someevent', event);});</script>
四.测试
模拟集群:启动多个Java服务器,端口不一样即可
模拟多客户:副本多个页面,配置ip不一样即可
接口测试用例
POST http://127.0.0.1:8083/push/user
Accept: application/json
Content-Type: application/json{
"uid": "2",
"msg": "这是服务1给2发的"
}
五.源码下载
GitHub地址
觉得不错可以给我点个star/赞。
小小赞助,谢谢!
Netty-SocketIO 集群解决方案相关推荐
- 分布式IM及Netty服务集群解决方案
一.概述 使用netty开发分布式Im,提供分布netty集群解决方案.服务端通过负载均衡策略与服务集群建立连接,消息发送通过服务间集群的通信进行消息转发. 二.集群架构 三.项目地址 https:/ ...
- WebSocket 集群解决方案
欢迎关注方志朋的博客,回复"666"获面试宝典 问题起因 最近做项目时遇到了需要多用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Sessio ...
- 分布式 WebSocket 集群解决方案
作者 | weixin_34194702 来源 | blog.csdn.net/weixin_34194702/article/details/88701309 问题起因 最近做项目时遇到了需要多用户 ...
- tcp out of order解决_分布式集群解决方案 学习笔记
回到目录: OrangeZh:拉勾教育:JAVA高薪训练营 学习技术篇zhuanlan.zhihu.com 介绍 文章内容输出来源:拉勾教育 Java高薪训练营 分布式集群解决方案相关 什么是分布式 ...
- 私有云办公平台大规模集群/企业级集群/小型工作室集群解决方案:NextCloud集群部署方案--NextCloud集群架构设计
原作者:NextCloud文档库 转载来源:https://docs.nextcloud.com/server/11/admin_manual/installation/deployment_reco ...
- Thanos 开源的大规模Prometheus集群解决方案
Thanos 开源的大规模Prometheus集群解决方案 参考文章: (1)Thanos 开源的大规模Prometheus集群解决方案 (2)https://www.cnblogs.com/yx88 ...
- 阿里云服务器(ECS)集群解决方案
阿里云服务器(ECS)集群解决方案 参考文章: (1)阿里云服务器(ECS)集群解决方案 (2)https://www.cnblogs.com/568yscom/p/10769175.html 备忘一 ...
- Nacos高可用集群解决方案-Docker版本
Nacos高可用集群解决方案-Docker版本 参考文章: (1)Nacos高可用集群解决方案-Docker版本 (2)https://www.cnblogs.com/hellxz/p/nacos-c ...
- 高可用性、负载均衡的mysql集群解决方案(data+sql+mgm节点)
高可用性.负载均衡的mysql 集群解决方案 制作人:Dason QQ:623466642 博客:http://dason.blog.51cto.com/ 一.mysql 的市场占有率 二.mysql ...
最新文章
- python学习笔记(3) -- 常用数据类型
- python小技巧-基于python本身
- SpringBoot集成Mybatis(0配置注解版)
- erlang精要(2)-数制
- CF917C. Pollywog
- C++自定义对象如何支持Range-based循环语法
- mvc中嵌入ssrs报表_如何在SSRS报表中过滤多维OLAP多维数据集
- IOS开发-关于自定义TabBar条
- 如果看了这篇文章你还不懂傅里叶变换,那就过来掐死我吧(下)
- 文档协作编辑 ONLYOFFICE 部署和使用教程
- flux架构浅谈:什么数据才应该放store
- JAVA入门_工具类_书籍借阅日期计算
- 安卓逆向助手反编译apk后文件夹为空
- openEuler服务器系统,操作系统openEuler开放源代码、镜像及开发测试环境
- oracle备份数据exp,oracle数据库备份之exp增量备份
- python大神年薪_我程序员年薪 80 万被亲戚鄙视不如在二本教书的博士生?
- ios开发 多人语音聊天_iOS语音提醒开发总结
- 大学计算机python基础_大学计算机python基础课件2015lecture17
- Linux系统命令(电子邮件新闻组)
- TeamViewer被发现用于(检测为)商业用途解决方案(绝对有效)
热门文章
- CMS垃圾回收器和G1垃圾回收器区别
- excel工作簿打开密码破解
- 未明学院:别焦虑,在命运为你安排的时区里,一切都准时
- 杜克大学计算机统计学,杜克大学的统计学专业怎么样?
- django-blog-zinnia添加文本编辑器ckeditor
- java计算机毕业设计BS高校教师考勤系统MyBatis+系统+LW文档+源码+调试部署
- Linux下Redis安装与配置 (yum 软件源下载安装)
- 2021年,企业做好电商的六大关键点
- 解决:npm ERR! code ELIFECYCLE npm ERR! errno 1
- 事件轮询(Event Loop) 宏任务与微任务