Netty-SocketIO 集群解决方案

Netty-SocketIO作为一个Socket框架,使用非常方便,并且使用Netty开发性能也有保证。

但是,我在使用Netty-SocketIO框架时,却发现,国内的资料比较少,虽然有些Demo级别的技术分享,但是关于集群解决方案,并没有什么较好的解决方法。

文章目录

  • Netty-SocketIO 集群解决方案
        • `Netty-SocketIO`作为一个Socket框架,使用非常方便,并且使用`Netty开发`性能也有保证。
        • 但是,我在使用`Netty-SocketIO`框架时,却发现,国内的资料比较少,虽然有些Demo级别的技术分享,但是关于集群解决方案,并没有什么较好的解决方法。
      • 所以,博主结合GitHub上的`Issues`,实现了一种集群的解决方案。
    • 一. 解决方案原理
    • 二.服务端
      • 1.版本
      • 2.项目结构
      • 3.架构\原理图
      • 3.代码
        • com.lt.push.config
          • ClientCache
          • EventListenner
          • MessagePushConfig
          • RedisConfig
          • RedisSub
        • com.lt.push/controller
          • PushMsgReq
          • PushController
        • entity
          • PushMsgEntity
        • util
          • R
        • LtPushApplication
        • resources
          • application.yml
          • application-local.yml
    • 三.客户端
    • 四.测试
    • 五.源码下载
      • 小小赞助,谢谢!

所以,博主结合GitHub上的Issues,实现了一种集群的解决方案。

一. 解决方案原理

使用Redis订阅\发布模式解决,实现多集群间通讯。

注:
1.官方好像推荐使用Redisson来解决集群问题,有兴趣的同学可以试试,我是没试出来。。
2.最新版Redis支持数据流存储,可通过缓存SocketIOClient来实现,有兴趣的同学可以试试。

二.服务端

使用SpringBoot+Netty-SocketIO+Redis,可通过修改启动端口,模拟多服务端

1.版本

netty-socketio: 1.7.11

spring-boot-: 2.2.1.RELEASE

JDK: 1.8

2.项目结构

3.架构\原理图

3.代码

com.lt.push.config

ClientCache

本地(数据中心)类

package com.lt.push.config;import com.corundumstudio.socketio.SocketIOClient;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;/*** @author litong* @date 2019/11/6 16:01*/
@Component
public class ClientCache {/*** 本地缓存*/private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap<>();/*** 存入本地缓存** @param userId         用户ID* @param sessionId      页面sessionID* @param socketIOClient 页面对应的通道连接信息*/public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) {HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId);if (sessionIdClientCache == null) {sessionIdClientCache = new HashMap<>();}sessionIdClientCache.put(sessionId, socketIOClient);concurrentHashMap.put(userId, sessionIdClientCache);}/*** 根据用户ID获取所有通道信息** @param userId* @return*/public HashMap<UUID, SocketIOClient> getUserClient(String userId) {return concurrentHashMap.get(userId);}/*** 根据用户ID及页面sessionID删除页面链接信息** @param userId* @param sessionId*/public void deleteSessionClient(String userId, UUID sessionId) {concurrentHashMap.get(userId).remove(sessionId);}
}
EventListenner

事件监听/注册中心

package com.lt.push.config;import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.UUID;/*** @author litong* @date 2019/11/6 15:59*/
@Component
public class EventListenner {@Resourceprivate ClientCache clientCache;/*** 客户端连接** @param client*/@OnConnectpublic void onConnect(SocketIOClient client) {String userId = client.getHandshakeData().getSingleUrlParam("userId");UUID sessionId = client.getSessionId();clientCache.saveClient(userId, sessionId, client);System.out.println("建立连接");}/*** 客户端断开** @param client*/@OnDisconnectpublic void onDisconnect(SocketIOClient client) {String userId = client.getHandshakeData().getSingleUrlParam("userId");clientCache.deleteSessionClient(userId, client.getSessionId());System.out.println("关闭连接");}//消息接收入口,当接收到消息后,查找发送目标客户端,并且向该客户端发送消息,且给自己发送消息// 暂未使用@OnEvent("messageevent")public void onEvent(SocketIOClient client, AckRequest request) {}
}
MessagePushConfig

Socket线程开启类

package com.lt.push.config;import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author litong* @date 2019/11/1 13:32*/
@Component
@Slf4j
public class MessagePushConfig implements InitializingBean {@Resourceprivate EventListenner eventListenner;@Value("${push.server.port}")private int serverPort;@Autowiredprivate SocketIOServer socketIOServer;@Overridepublic void afterPropertiesSet() throws Exception {socketIOServer.start();System.out.println("启动正常");}@Beanpublic SocketIOServer socketIOServer() {Configuration config = new Configuration();config.setPort(serverPort);SocketConfig socketConfig = new SocketConfig();socketConfig.setReuseAddress(true);socketConfig.setTcpNoDelay(true);socketConfig.setSoLinger(0);config.setSocketConfig(socketConfig);config.setHostname("localhost");SocketIOServer server = new SocketIOServer(config);server.addListeners(eventListenner);return server;}
}
RedisConfig

redis配置订阅\发布模式

package com.lt.push.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @author litong* @date 2019/10/17 14:33*/
@Configuration
public class RedisConfig {@Autowiredprivate RedisTemplate redisTemplate;@Beanpublic RedisTemplate<String, Object> stringSerializerRedisTemplate() {RedisSerializer<String> stringSerializer = new StringRedisSerializer();redisTemplate.setKeySerializer(stringSerializer);redisTemplate.setValueSerializer(stringSerializer);redisTemplate.setHashKeySerializer(stringSerializer);redisTemplate.setHashValueSerializer(stringSerializer);return redisTemplate;}@Bean(destroyMethod = "destroy")public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory,MessageListener redisMessageListener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);//可以添加多个 messageListenercontainer.addMessageListener(redisMessageListener, new PatternTopic("index"));return container;}
}
RedisSub

redis发布配置

package com.lt.push.config;import com.alibaba.fastjson.JSONObject;
import com.corundumstudio.socketio.SocketIOClient;
import com.lt.push.entity.PushMsgEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.UUID;/*** @author litong* @date 2019/10/17 15:33*/
@Service(value = "redisMessageListener")
@Slf4j
public class RedisSub implements MessageListener {@Resourceprivate ClientCache clientCache;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {String msgString = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());PushMsgEntity pushMsgEntity = JSONObject.parseObject(msgString, PushMsgEntity.class);String channel = (String) redisTemplate.getValueSerializer().deserialize(message.getChannel());if (!StringUtils.isEmpty(channel) && !StringUtils.isEmpty(pushMsgEntity)) {// 向客户端推送消息HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(pushMsgEntity.getUid());if (userClient != null && !userClient.isEmpty()) {userClient.forEach((uuid, socketIOClient) -> {socketIOClient.sendEvent("chatevent", pushMsgEntity.getMessage());});}}}
}

com.lt.push/controller

PushMsgReq

Req请求类

package com.lt.push.controller.request;import lombok.Data;/*** @author litong* @date 2019/11/13 16:44*/
@Data
public class PushMsgReq {private String uid;private String msg;
}
PushController

推送消息控制层

package com.lt.push.controller;import com.alibaba.fastjson.JSON;
import com.lt.push.controller.request.PushMsgReq;
import com.lt.push.entity.PushMsgEntity;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author litong* @date 2019/11/6 16:02*/
@RestController
@RequestMapping("/push")
@AllArgsConstructor
@Slf4j
public class PushController {private RedisTemplate redisTemplate;@PostMapping("/user")public String pushUser(@RequestBody PushMsgReq pushMsgReq) {// 接受消息,存储到MongoDB中,// 发布到redis中PushMsgEntity pushMsgEntity = PushMsgEntity.builder().uid(pushMsgReq.getUid()).message(pushMsgReq.getMsg()).build();redisTemplate.convertAndSend("index", JSON.toJSONString(pushMsgEntity));return "success";}
}

entity

PushMsgEntity

推送实体类

package com.lt.push.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author litong* @date 2019/11/12 16:24*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PushMsgEntity {private String uid;private String message;}

util

R

返回数据格式类

package com.lt.push.util;import lombok.Data;/*** @author litong* @date 2019/11/10 15:15*/
@Data
public class R<T> {private Integer code;private String msg;private T data;public static <T> R<T> ok() {return restResult(null, 1000, "成功");}public static <T> R<T> ok(T data) {return restResult(data, 1000, null);}private static <T> R<T> restResult(T data, int code, String msg) {R<T> apiResult = new R<>();apiResult.setCode(code);apiResult.setData(data);apiResult.setMsg(msg);return apiResult;}
}

LtPushApplication

启动类

package com.lt.push;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class LtPushApplication {public static void main(String[] args) {SpringApplication.run(LtPushApplication.class, args);}}

resources

配置文件

application.yml
spring:application:name: lt-pushprofiles:active: local
application-local.yml
server:port: 8081spring:redis:host: localhostport: 6379password:
# 推送服务、地址端口
push:server:host: localhostport: 8082

三.客户端

使用网页模拟,可自行修改客户端代码,模拟多用户

为了方便,socket.io.js,我直接使用的线上的,可自行下载后放到项目中

端口可根据实际进行修改,在控制台查看推送信息

<script src="https://cdn.bootcss.com/socket.io/2.1.1/socket.io.js"></script><script type="text/javascript">// 此处userId为用户标识,后面推送时使用var socket = io.connect('http://localhost:8082?userId=1', {'reconnection delay' : 2000,'force new connection' : true});socket.on('message', function(data) {// here is your handler on messages from serverconsole.log(data)});socket.on('chatevent', function(data) {// here is your handler on chatevent from serverconsole.log(data)});socket.on('connect', function() {// connection established, now we can send an objects// send json-object to server// '@class' property should be defined and should// equals to full class name.var obj = { '@class' : 'com.sample.SomeClass'}socket.json.send(obj);// send event-object to server// '@class' property is NOT necessary in this casevar event = {}socket.emit('someevent', event);});</script>

四.测试

模拟集群:启动多个Java服务器,端口不一样即可

模拟多客户:副本多个页面,配置ip不一样即可

接口测试用例

POST http://127.0.0.1:8083/push/user
Accept: application/json
Content-Type: application/json{
"uid": "2",
"msg": "这是服务1给2发的"
}

五.源码下载

GitHub地址
觉得不错可以给我点个star/赞。

小小赞助,谢谢!

Netty-SocketIO 集群解决方案相关推荐

  1. 分布式IM及Netty服务集群解决方案

    一.概述 使用netty开发分布式Im,提供分布netty集群解决方案.服务端通过负载均衡策略与服务集群建立连接,消息发送通过服务间集群的通信进行消息转发. 二.集群架构 三.项目地址 https:/ ...

  2. WebSocket 集群解决方案

    欢迎关注方志朋的博客,回复"666"获面试宝典 问题起因 最近做项目时遇到了需要多用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Sessio ...

  3. 分布式 WebSocket 集群解决方案

    作者 | weixin_34194702 来源 | blog.csdn.net/weixin_34194702/article/details/88701309 问题起因 最近做项目时遇到了需要多用户 ...

  4. tcp out of order解决_分布式集群解决方案 学习笔记

    回到目录: OrangeZh:拉勾教育:JAVA高薪训练营 学习技术篇​zhuanlan.zhihu.com 介绍 文章内容输出来源:拉勾教育 Java高薪训练营 分布式集群解决方案相关 什么是分布式 ...

  5. 私有云办公平台大规模集群/企业级集群/小型工作室集群解决方案:NextCloud集群部署方案--NextCloud集群架构设计

    原作者:NextCloud文档库 转载来源:https://docs.nextcloud.com/server/11/admin_manual/installation/deployment_reco ...

  6. Thanos 开源的大规模Prometheus集群解决方案

    Thanos 开源的大规模Prometheus集群解决方案 参考文章: (1)Thanos 开源的大规模Prometheus集群解决方案 (2)https://www.cnblogs.com/yx88 ...

  7. 阿里云服务器(ECS)集群解决方案

    阿里云服务器(ECS)集群解决方案 参考文章: (1)阿里云服务器(ECS)集群解决方案 (2)https://www.cnblogs.com/568yscom/p/10769175.html 备忘一 ...

  8. Nacos高可用集群解决方案-Docker版本

    Nacos高可用集群解决方案-Docker版本 参考文章: (1)Nacos高可用集群解决方案-Docker版本 (2)https://www.cnblogs.com/hellxz/p/nacos-c ...

  9. 高可用性、负载均衡的mysql集群解决方案(data+sql+mgm节点)

    高可用性.负载均衡的mysql 集群解决方案 制作人:Dason QQ:623466642 博客:http://dason.blog.51cto.com/ 一.mysql 的市场占有率 二.mysql ...

最新文章

  1. python学习笔记(3) -- 常用数据类型
  2. python小技巧-基于python本身
  3. SpringBoot集成Mybatis(0配置注解版)
  4. erlang精要(2)-数制
  5. CF917C. Pollywog
  6. C++自定义对象如何支持Range-based循环语法
  7. mvc中嵌入ssrs报表_如何在SSRS报表中过滤多维OLAP多维数据集
  8. IOS开发-关于自定义TabBar条
  9. 如果看了这篇文章你还不懂傅里叶变换,那就过来掐死我吧(下)
  10. 文档协作编辑 ONLYOFFICE 部署和使用教程
  11. flux架构浅谈:什么数据才应该放store
  12. JAVA入门_工具类_书籍借阅日期计算
  13. 安卓逆向助手反编译apk后文件夹为空
  14. openEuler服务器系统,操作系统openEuler开放源代码、镜像及开发测试环境
  15. oracle备份数据exp,oracle数据库备份之exp增量备份
  16. python大神年薪_我程序员年薪 80 万被亲戚鄙视不如在二本教书的博士生?
  17. ios开发 多人语音聊天_iOS语音提醒开发总结
  18. 大学计算机python基础_大学计算机python基础课件2015lecture17
  19. Linux系统命令(电子邮件新闻组)
  20. TeamViewer被发现用于(检测为)商业用途解决方案(绝对有效)

热门文章

  1. CMS垃圾回收器和G1垃圾回收器区别
  2. excel工作簿打开密码破解
  3. 未明学院:别焦虑,在命运为你安排的时区里,一切都准时
  4. 杜克大学计算机统计学,杜克大学的统计学专业怎么样?
  5. django-blog-zinnia添加文本编辑器ckeditor
  6. java计算机毕业设计BS高校教师考勤系统MyBatis+系统+LW文档+源码+调试部署
  7. Linux下Redis安装与配置 (yum 软件源下载安装)
  8. 2021年,企业做好电商的六大关键点
  9. 解决:npm ERR! code ELIFECYCLE npm ERR! errno 1
  10. 事件轮询(Event Loop) 宏任务与微任务