RedisCluster模式启动的环境中,通过Redis中的每个连接,都可以访问 cluster nodes 访问到所有的服务器列表以及其所处于的角色(master/slave)。对于RedisCluster来说,在实际运行时,只会访问到其中的master节点,slave既不能用于write操作,也不能进行read。
原有JedisCluster


 
JedisCluster的UML图结果如上图所示,在每次执行JedisCluster相关操作时,都需要通过JedisClusterCommand提供connection来进行,该connection需要根据key来计算出对应的slot,以便可以进行后续redis相关操作。
return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {@Overridepublic String execute(Jedis connection) {return connection.get(key);}
}.run(key);

在JedisClusterCommand.run方法中,会根据slot计算出对应的connection。
为了尽量减少对原有代码的侵入性,我们需要定义线程上下文(ThreadLocal)级别的变量,其中内置了访问的粒度(READ/WRITE),以便访问的为master还是slave Redis数据源。
改进的ZhenJedisCluster

根据jedis连接来获得Cluster结构:
private Map<String, ClusterNodeObject> getClusterNodes(Jedis jedis) {Map<String, ClusterNodeObject> hpToNodeObjectMap = new HashMap<>();String clusterNodesCommand = jedis.clusterNodes();String[] allNodes = clusterNodesCommand.split("\n");for (String allNode : allNodes) {String[] splits = allNode.split(" ");String hostAndPort = splits[1];ClusterNodeObject clusterNodeObject =new ClusterNodeObject(splits[0], splits[1], splits[2].contains("master"), splits[3],Long.parseLong(splits[4]), Long.parseLong(splits[5]), splits[6],splits[7].equalsIgnoreCase("connected"), splits.length == 9 ? splits[8] : null);hpToNodeObjectMap.put(hostAndPort, clusterNodeObject);}return hpToNodeObjectMap;
}

将其整理成可用结构,分出master节点,以及slave节点对应的master节点,区分读写:
Map<String, ZhenJedisPool> masterNodes = new HashMap<>();
for (ClusterNodeObject clusterNodeObject : clusterNodeObjects) {String ipPort = clusterNodeObject.getIpPort();String[] ipPortSplits = ipPort.split(":");HostAndPort hostAndPort = new HostAndPort(ipPortSplits[0], Integer.parseInt(ipPortSplits[1]));setNodeIfNotExist(hostAndPort);if (clusterNodeObject.isMaster()) {ZhenJedisPool zhenJedisPool = new ZhenJedisPool();zhenJedisPool.setWritePool(nodes.get(ipPort));masterNodes.put(clusterNodeObject.getNodeId(), zhenJedisPool);String[] slotSplits = clusterNodeObject.getSlot().split("-");for (int i = Integer.parseInt(slotSplits[0]); i <= Integer.parseInt(slotSplits[1]); i++) {this.slots.put(i, zhenJedisPool);}}
}for (ClusterNodeObject clusterNodeObject : clusterNodeObjects) {if (!clusterNodeObject.isMaster()) {String masterNodeId = clusterNodeObject.getMasterNodeId();ZhenJedisPool zhenJedisPool = masterNodes.get(masterNodeId);zhenJedisPool.getReadPools().add(nodes.get(clusterNodeObject.getIpPort()));}
}

改进的结构中,需要getConnectionFromSlot方法需要调用ZhenJedisClusterInfoCache.getSlotPool来根据slot以及当前读写状态(read/write)来获取对应的Jedis连接:
public JedisPool getSlotPool(int slot, ZhenQueryContext queryContext) {r.lock();try {ZhenJedisPool zhenJedisPool = slots.get(slot);if (queryContext.getOperationType() == OperationType.WRITE) {return zhenJedisPool.getWritePool();} else {List<JedisPool> readPools = zhenJedisPool.getReadPools();return readPools.get(new Random().nextInt(readPools.size()));}} finally {r.unlock();}
}

 
对于JedisCluster中,在执行每一步操作之前,都需要设置对应的读写上下文,便于在内部选择master/slave connection:
@Override
public String get(final String key) {ZhenQueryContextHolder.getInstance().setQueryContext(new ZhenQueryContext(OperationType.READ));return new ZhenJedisClusterCommand<String>(connectionHandler, maxRedirections) {@Overridepublic String execute(Jedis connection) {return connection.get(key);}}.run(key);
}

处理完成后,只需要在执行时使用我们提供的JedisCluster即可正常运行。
执行验证
当前节点如果为slave,也不能只读,需要额外设置属性 slave-read-only


 
可以证实,经过改造后确实调用到了指定的master节点上:
5974ed7dd81c112d9a2354a0a985995913b4702c 192.168.1.137:6389 master - 0 1470273087539 26 connected 0-5640
d08dc883ee4fcb90c4bb47992ee03e6474398324 192.168.1.137:6390 master - 0 1470273086034 25 connected 5641-11040
ffb4db4e1ced0f91ea66cd2335f7e4eadc29fd56 192.168.1.138:6390 slave 5974ed7dd81c112d9a2354a0a985995913b4702c 0 1470273087539 26 connected
c69b521a30336caf8bce078047cf9bb5f37363ee 192.168.1.137:6388 master - 0 1470273086536 28 connected 11041-16383
532e58842d001f8097fadc325bdb5541b788a360 192.168.1.138:6389 slave c69b521a30336caf8bce078047cf9bb5f37363ee 0 1470273086034 28 connected
aa52c7810e499d042e94e0aa4bc28c57a1da74e3 192.168.1.138:6388 myself,slave d08dc883ee4fcb90c4bb47992ee03e6474398324 0 0 19 connected

出现该问题,加上slave-readonly yes 参数后,重启发现也并没有什么作用,仍然报上面的错误,而且直接通过命令行连接时,仍然出现问题:
192.168.1.137:6390> get key1
-> Redirected to slot [9189] located at 192.168.1.138:6388
"value1"

经过查找,在github上发现问题所在,https://github.com/antirez/redis/issues/2202,如果连接到slave节点,可以通过readonly来进行处理:
//如果是只读连接 { connection.readonly();
}
return execute(connection);

使用zookeeper监测集群状态变化


 
建立Redis agent,用于监测RedisCluster的状态变化,如果RedisCluster中的状态与zookeeper上的一致,不进行任何操作,否则更新zookeeper上的文件。
agent的执行时间间隔可以控制在1s,便于及时发现rediscluster的状态问题。
应用程序中需要注册监听该文件的变化,如果有变化及时进行更新redis读写池。
zookeeper agent
操作zookeeper可以使用curator框架,Curator框架提供了一套高级的API, 简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制。
ZooKeeper原生的API支持通过注册Watcher来进行事件监听,但是Watcher通知是一次性的,因此开发过程中需要反复注册Watcher,比较繁琐。Curator引入了Cache来监听ZooKeeper服务端的事件。Cache对ZooKeeper事件监听进行了封装,能够自动处理反复注册监听,简化了ZooKeeper原生API繁琐的开发过程。
关于curator的基本介绍,可以参考:http://ifeve.com/zookeeper-curato-framework/
首先需要添加maven依赖:
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.7.0</version>
</dependency>

在主线程中进行间隔1s的轮询,查询zookeeper上的文件与当前redis状态,如果相同不做任何修改,否则进行更新。
建立zookeeper连接:
client = CuratorFrameworkFactory.newClient("xxx",new RetryNTimes(5, 5000));
client.start();

获取zookeeper上的值与线上redis环境进行比对:
public void compareAndSet() throws Exception {List<ZhenJedisPoolObject> jedisPoolFromCluster = getJedisPoolFromCluster();String currentString = JSON.toJSONString(jedisPoolFromCluster);if (client.checkExists().forPath(TOPO_PATH) == null) {SysOutLogger.info("Start to create zk node: " + TOPO_PATH);client.create().creatingParentsIfNeeded().forPath(TOPO_PATH, currentString.getBytes());} else {String statData = new String(client.getData().forPath(TOPO_PATH));if (!currentString.equalsIgnoreCase(statData)) {SysOutLogger.info("Node not synchronized with online, to reset...");client.setData().forPath(TOPO_PATH, currentString.getBytes());}}
}

应用端监测文件修改
而在应用端,完全依赖zookeeper上的文件状态变更,来更新rediscluster中的slots,nodes等对象:
String content = new String(client.getData().forPath(TOPO_PATH), "UTF-8");
List<ZhenJedisPoolObject> zhenJedisPoolObjects =JSON.parseObject(content, new TypeReference<List<ZhenJedisPoolObject>>() {});
discoverClusterNodesAndSlots(zhenJedisPoolObjects);final NodeCache nodeCache = new NodeCache(client, TOPO_PATH, false);
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {String content = new String(nodeCache.getCurrentData().getData(), "UTF-8");List<ZhenJedisPoolObject> zhenJedisPoolObjects =JSON.parseObject(content, new TypeReference<List<ZhenJedisPoolObject>>() {});discoverClusterNodesAndSlots(zhenJedisPoolObjects);}
});

注意这里需要使用到curator中的NodeCache来操作,它可以帮助监听zookeeper上节点数据的变化。如果想要监听zookeeper上路径的变化,可以使用:PathChildrenCache,根据对应的事件类型event type:CHILD_ADDED, CHILD_REMOVED, CHILD_UPDATED来进行事件处理。
注意需要保证zookeeper上的redis连接,能够以正常的方式访问到(内外网切换)。 

转载于:https://www.cnblogs.com/mmaa/p/5789846.html

RedisCluster读写分离改造相关推荐

  1. redis读写分离 java_spring-data-redis读写分离

    在对Redis进行性能优化时,一直想对Redis进行读写分离.但由于项目底层采用spring-data-redis对redis进行操作,参考spring官网却发现spring-data-redis目前 ...

  2. Sharding-JDBC教程:Spring Boot整合Sharding-JDBC实现数据分表+读写分离

    点击上方"方志朋",选择"置顶公众号" 技术文章第一时间送达! 读写分离 在上一篇文章介绍了如何使用Sharing-JDBC实现数据库的读写分离.读写分离的好处 ...

  3. hibernate oracle 读写分离_利用FDW进行ORACLE到Postgresql的数据迁移

    随着开源数据库技术的发展和去"O"工作的推进,越来越多企业生产系统选择使用Postgresql数据库.Pgsql采用多进程结构,其存储过程.函数的支持好于mysql.个人认为pgs ...

  4. 读写分离原来这么简单,一个小注解就够了

    目录 前言 环境部署 开始项目 目录结构 建表 主从数据源配置 设置路由 数据源的注解 aop切换数据源 最后 前言 相信有经验的同学都清楚,当db的读写量过高时,我们会备份一份或多份的从库用于做数据 ...

  5. 利用Mycat中间件实现RDS MySQL的分库分表及读写分离功能

    https://aws.amazon.com/cn/blogs/china/mycat-rds-mysql/ 随着移动互联网的兴起和大数据的蓬勃发展,系统的数据量正呈几何倍数增长,系统的压力也越来越大 ...

  6. 简单好用!利用Spring AOP技术10分钟实现一个数据库读写分离方案

    前言 最近我们的APP在线用户越来越多,接口的响应速度也是越来越慢,经过运维排查发现是由于并发查询太多导致的数据库压力比较大,架构师经过调研给出了数据库读写分离的解决方案,为了快速解决问题,我们最终采 ...

  7. EF通用数据层封装类(支持读写分离,一主多从)

    浅谈orm 记得四年前在学校第一次接触到 Ling to Sql,那时候瞬间发现不用手写sql语句是多么的方便,后面慢慢的接触了许多orm框架,像 EF,Dapper,Hibernate,Servic ...

  8. Discuz!NT数据库读写分离方案

    目前在Discuz!NT这个产品中,数据库作为数据持久化工具,必定在并发访问频繁且负载压力较大的情况下成 为系统性能的'瓶颈'.即使使用本地缓存等方式来解决频繁访问数据库的问题,但仍旧会有大量的并发请 ...

  9. SQL Server实现读写分离提高系统并发

    转自:http://www.canway.net/Lists/CanwayOriginalArticels/DispForm.aspx?ID=666 在一些大型的网站或者应用中,单台的SQL Serv ...

最新文章

  1. 资源 | 12月机器学习TOP 10文章,错过的快补课
  2. 【逆向】UE4 渲染流程分析
  3. SnapGene 4.3.6 win 中文完美不闪退
  4. 安装mysql需要配置什么软件_软件配置篇-MySQL下载及安装
  5. 想学人工智能从哪入手?
  6. 股票卖出以后可以立即把钱转出吗?
  7. AI 六十年,强人工智能何时到来?
  8. Django 模板层
  9. wps画流程图交叉弧形_word流程图-WPS绘制流程图的正确打开方式,超级简单
  10. 17-统一网关Gateway
  11. [TCP灵魂之问]介绍一下 TCP 报文头部的字段
  12. MacOS Catalina Beta使用体验
  13. android 头像球_【Android 界面效果44】Android之圆头像实例
  14. 微信改名服务器升级,公众号赞赏升级后,微信宣布可修改一次赞赏账户名称
  15. 共享充电宝有什么好拍? | 拍者手记
  16. 陈老师排课12A(普通新课表)排课方法
  17. 【问链-EOS公开课】第十五课 用cleos注册EOS主网账户、投票和发币
  18. 欢迎使用CSDN-markdown编辑器 新人报道
  19. 文献略读-JHM(月桂基防治煤粉尘微观机理)
  20. 《Spring in Action》第4章-Spring Security

热门文章

  1. 慢查询优化,我终于在生产踩到了这个坑!!
  2. 同事把 Redis用成这鬼样子,真坑!
  3. 八种 WebSocket 框架的性能比较
  4. 6.MYSQL视图的使用和管理
  5. Java 分割字符串的方法String.split()底层原理
  6. mysql注入绕过单引号_SQL注入-绕过过滤规则
  7. erp沙盘模拟软件_VOL.977 工商第九周周报 ERP沙盘模拟大赛排名第一 跨学科校企合作商讨筹备...
  8. python怎么输出图像测试_python pyautogui-不检测图像时的位置打印问题
  9. 成功解决xgboost.core.XGBoostError: b'[20:58:45] C:\\Users\\Administrator\\Desktop\\xgboost\\dmlc-core\\s
  10. 成功解决ValueError: Found input variables with inconsistent numbers of samples: [86, 891]