我们在java项目里面连接已经搭建好的Codis集群时,需要用到其java客户端——Jodis。这一篇我们就来看看Jodis是如何操作对Codis集群进行操作的。

import io.codis.jodis.JedisResourcePool;
import io.codis.jodis.RoundRobinJedisPool;
import redis.clients.jedis.Jedis;
/*** @author wujiang* @version 1.0.0* @date 2017/7/12*/
public class CodisDemo {public static void main(String[] args) {JedisResourcePool jedisPool = RoundRobinJedisPool.create().curatorClient("10.0.2.15:2181", 30000).zkProxyDir("/jodis/codis-wujiang").build();try (Jedis jedis = jedisPool.getResource()) {//省略代码}}
}

如果在运行中遇到报错

Redis.clients.jedis.exceptions.JedisException: Proxy list empty

可以参考之前的一篇博客Jodis报错- JedisException- Proxy list empty进行解决。

Jodis中最重要的一个类就是RoundRobinJedisPool。首先来看看这个类的继承结构和方法

在RoundRobinJedisPool.class中有一个内部类Builder,是用来与zk集群进行连接的。在build之前,一直都是在设置Builder的属性,例如将Builder的connectionTimeoutMs和soTimeoutMs分别设置为2000,database设置为0,传入zk地址和路径等等。然后会调用下面的build方法

public static final class Builder {private CuratorFramework curatorClient;private boolean closeCurator;private String zkProxyDir;private String zkAddr;private int zkSessionTimeoutMs;private JedisPoolConfig poolConfig;private int connectionTimeoutMs;private int soTimeoutMs;private String password;private int database;private String clientName;.public RoundRobinJedisPool build() {this.validate();return new RoundRobinJedisPool(this.curatorClient, this.closeCurator, this.zkProxyDir, this.poolConfig, this.connectionTimeoutMs, this.soTimeoutMs, this.password, this.database, this.clientName, null);}.private void validate() {//首先检查zkProxyDir和zkAddr是否为空Preconditions.checkNotNull(this.zkProxyDir, "zkProxyDir can not be null");//初始化curatorClient并启动if (this.curatorClient == null) {Preconditions.checkNotNull(this.zkAddr, "zk client can not be null");this.curatorClient = CuratorFrameworkFactory.builder().connectString(this.zkAddr).sessionTimeoutMs(this.zkSessionTimeoutMs).retryPolicy(new BoundedExponentialBackoffRetryUntilElapsed(100, 30000, -1L)).build();this.curatorClient.start();this.closeCurator = true;} else if (this.curatorClient.getState() == CuratorFrameworkState.LATENT) {this.curatorClient.start();}if (this.poolConfig == null) {this.poolConfig = new JedisPoolConfig();}}
}

初始化的CuratorClient如下所示,curator-client组件可以作为zookeeper client来使用,它提供了zk实例创建/重连机制等

最关键的是后面创建RoundRobinJedisPool这一步。

private RoundRobinJedisPool(CuratorFramework curatorClient, boolean closeCurator, String zkProxyDir, JedisPoolConfig poolConfig, int connectionTimeoutMs, int soTimeoutMs, String password, int database, String clientName) {//新建ImmutableList<PooledObject>,每个PooledObject包含了proxy addr以及jedisPoolthis.pools = ImmutableList.of();this.nextIdx = new AtomicInteger(-1);this.poolConfig = poolConfig;this.connectionTimeoutMs = connectionTimeoutMs;this.soTimeoutMs = soTimeoutMs;this.password = password;this.database = database;this.clientName = clientName;this.curatorClient = curatorClient;//truethis.closeCurator = closeCurator;this.watcher = new PathChildrenCache(curatorClient, zkProxyDir, true);//监听zkProxyDir下面的变化,也就是当集群中新加入proxy或者有proxy宕机之后,都会由watcher得到该消息this.watcher.getListenable().addListener(new PathChildrenCacheListener() {private void logEvent(PathChildrenCacheEvent event) {StringBuilder msg = new StringBuilder("Receive child event: ");msg.append("type=").append(event.getType());ChildData data = event.getData();if (data != null) {msg.append(", path=").append(data.getPath());msg.append(", stat=").append(data.getStat());if (data.getData() != null) {msg.append(", bytes length=").append(data.getData().length);} else {msg.append(", no bytes");}} else {msg.append(", no data");}RoundRobinJedisPool.LOG.info(msg.toString());}public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {this.logEvent(event);//如果zk下监听事件类型为添加、更新、删除(这是通过静态代码块注入的),就重置poolif (RoundRobinJedisPool.RESET_TYPES.contains(event.getType())) {RoundRobinJedisPool.this.resetPools();}}});try {//以BUILD_INITIAL_CACHE模式启动watcherthis.watcher.start(StartMode.BUILD_INITIAL_CACHE);} catch (Exception var11) {this.close();throw new JedisException(var11);}this.resetPools();
}
//启动watcher的方法
public void rebuild() throws Exception {Preconditions.checkState(!this.executorService.isShutdown(), "cache has been closed");this.ensurePath();this.clear();///jodis/codis-wujiang下面的proxy,本例中只有一个proxy-00e6b8c9d5cc40ed5f81ea515b2b7695List<String> children = (List)this.client.getChildren().forPath(this.path);Iterator var2 = children.iterator();while(var2.hasNext()) {String child = (String)var2.next();///jodis/codis-wujiang/proxy-00e6b8c9d5cc40ed5f81ea515b2b7695String fullPath = ZKPaths.makePath(this.path, child);this.internalRebuildNode(fullPath);if (this.rebuildTestExchanger != null) {this.rebuildTestExchanger.exchange(new Object());}}this.offerOperation(new RefreshOperation(this, PathChildrenCache.RefreshMode.FORCE_GET_DATA_AND_STAT));}

下一步,resetPool,根据zk中的proxy以及对应的jedis变化,更新集群中的redisPool。这样pool中永远都有每个proxy以及proxy中对应的jedisPool。

注意,下面的addr2Pool是RoundRobinJedisPool当前的pool,从zk当前的数据中取出现在的所有proxy,从addr2Pool中删除,并放入builder中。如果删除的时候为null,证明原来的pool中并没有这个proxy连接,于是新建一个并放到builder中。最后的RoundRobinJedisPool.pool相当于是builder.build出来的。再将addr2Pool中剩余的proxy连接关闭(这里用了一个trick,JedisPool实际上是proxy连接)

private static final class PooledObject {//addr是proxy的19000端口地址public final String addr;public final JedisPool pool;public PooledObject(String addr, JedisPool pool) {this.addr = addr;this.pool = pool;}
}private void resetPools() {ImmutableList<RoundRobinJedisPool.PooledObject> pools = this.pools;Map<String, RoundRobinJedisPool.PooledObject> addr2Pool = Maps.newHashMapWithExpectedSize(pools.size());UnmodifiableIterator var3 = pools.iterator();while(var3.hasNext()) {RoundRobinJedisPool.PooledObject pool = (RoundRobinJedisPool.PooledObject)var3.next();addr2Pool.put(pool.addr, pool);}com.google.common.collect.ImmutableList.Builder<RoundRobinJedisPool.PooledObject> builder = ImmutableList.builder();Iterator var14 = this.watcher.getCurrentData().iterator();while(var14.hasNext()) {ChildData childData = (ChildData)var14.next();try {//封装了proxy addr(19000端口)以及state(是否在线)CodisProxyInfo proxyInfo = (CodisProxyInfo)MAPPER.readValue(childData.getData(), CodisProxyInfo.class);if ("online".equals(proxyInfo.getState())) {String addr = proxyInfo.getAddr();RoundRobinJedisPool.PooledObject pool = (RoundRobinJedisPool.PooledObject)addr2Pool.remove(addr);//第一次进来的时候,pool为null,会执行下面的pool的构造方法if (pool == null) {LOG.info("Add new proxy: " + addr);String[] hostAndPort = addr.split(":");String host = hostAndPort[0];int port = Integer.parseInt(hostAndPort[1]);pool = new RoundRobinJedisPool.PooledObject(addr, new JedisPool(this.poolConfig, host, port, this.connectionTimeoutMs, this.soTimeoutMs, this.password, this.database, this.clientName, false, (SSLSocketFactory)null, (SSLParameters)null, (HostnameVerifier)null));}builder.add(pool);}} catch (Throwable var12) {LOG.warn("parse " + childData.getPath() + " failed", var12);}}this.pools = builder.build();var14 = addr2Pool.values().iterator();while(var14.hasNext()) {RoundRobinJedisPool.PooledObject pool = (RoundRobinJedisPool.PooledObject)var14.next();LOG.info("Remove proxy: " + pool.addr);pool.pool.close();}}

创建好的RoundRobinJedisPool.pools如下所示


当一个pool准备好,下一步我们的操作就是从pool中取出相应的jedis实例,并进行相关操作。这个pool里面只有服务正常的proxy,error的proxy会从zk被剔除掉,详见 Codis源码解析——proxy添加到集群

public Jedis getResource() {ImmutableList<RoundRobinJedisPool.PooledObject> pools = this.pools;if (pools.isEmpty()) {throw new JedisException("Proxy list empty");} else {int current;int next;do {//nextIdx初始值是-1current = this.nextIdx.get();//pools.size取决于集群中有多少个proxy,可以看到,负载均衡算法采取的是轮询next = current >= pools.size() - 1 ? 0 : current + 1;} while(!this.nextIdx.compareAndSet(current, next));return ((RoundRobinJedisPool.PooledObject)pools.get(next)).pool.getResource();}
}

Jodis首先让你本地的java程序通过zookeeper(或者etcd)连接到proxy,并监听proxy的变化,因为可能根据需要对proxy做水平扩容。监听过程中,对由PooledObject组成的pool进行刷新,然后每次需要对codis集群进行操作的时候,都按照轮询负载均衡算法从pool中取出一个jedis实例进行操作。

说明
如有转载,请注明出处:
http://blog.csdn.net/antony9118/article/details/77776072

Codis源码解析——Jodis相关推荐

  1. 豌豆夹Redis解决方案Codis源码剖析:Proxy代理

    豌豆夹Redis解决方案Codis源码剖析:Proxy代理 1.预备知识 1.1 Codis Codis就不详细说了,摘抄一下GitHub上的一些项目描述: Codis is a proxy base ...

  2. 谷歌BERT预训练源码解析(二):模型构建

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/weixin_39470744/arti ...

  3. 谷歌BERT预训练源码解析(三):训练过程

    目录 前言 源码解析 主函数 自定义模型 遮蔽词预测 下一句预测 规范化数据集 前言 本部分介绍BERT训练过程,BERT模型训练过程是在自己的TPU上进行的,这部分我没做过研究所以不做深入探讨.BE ...

  4. 谷歌BERT预训练源码解析(一):训练数据生成

    目录 预训练源码结构简介 输入输出 源码解析 参数 主函数 创建训练实例 下一句预测&实例生成 随机遮蔽 输出 结果一览 预训练源码结构简介 关于BERT,简单来说,它是一个基于Transfo ...

  5. Gin源码解析和例子——中间件(middleware)

    在<Gin源码解析和例子--路由>一文中,我们已经初识中间件.本文将继续探讨这个技术.(转载请指明出于breaksoftware的csdn博客) Gin的中间件,本质是一个匿名回调函数.这 ...

  6. Colly源码解析——结合例子分析底层实现

    通过<Colly源码解析--框架>分析,我们可以知道Colly执行的主要流程.本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现.(转载请指明出于break ...

  7. libev源码解析——定时器监视器和组织形式

    我们先看下定时器监视器的数据结构.(转载请指明出于breaksoftware的csdn博客) /* invoked after a specific time, repeatable (based o ...

  8. libev源码解析——定时器原理

    本文将回答<libev源码解析--I/O模型>中抛出的两个问题.(转载请指明出于breaksoftware的csdn博客) 对于问题1:为什么backend_poll函数需要指定超时?我们 ...

  9. libev源码解析——I/O模型

    在<libev源码解析--总览>一文中,我们介绍过,libev是一个基于事件的循环库.本文将介绍其和事件及循环之间的关系.(转载请指明出于breaksoftware的csdn博客) 目前i ...

最新文章

  1. asp.net程序性能优化的七个方面
  2. python开发好学吗-python难学吗
  3. java工程师考试题目_成功拿到Offer,Java工程师笔试题及答案!
  4. 利用TF-IDF提取新闻文章摘要
  5. 从身份证管理系统思考企业CMDB的建设
  6. Ext3 -- Form 实例。 用来migrate file 数据到DB用的
  7. linux下使用select实现精确定时器
  8. 14英寸电脑长宽多少_华为MateBook 14 2020款 14英寸轻薄笔记本王者升级
  9. Tensorflow2.0.0版本和Keras2.4.3不兼容
  10. matlab 中 x 轴的各种设置
  11. org.tinygroup.database-数据库元数据定义
  12. Qt 远程开关机 WakeOnLAN 编辑MagicPacket
  13. Linux驱动之设备树(设备树下的LED驱动实验)
  14. 以太网未识别的网络win10_win10以太网为什么无Internet未识别网络?
  15. Android动态破解微信本地数据库(EnMicroMsg.db)
  16. html怎么给图片加页码,在Word里,怎样让页码显示在插入的图片上?
  17. 使用Dism++备份系统文件并恢复
  18. landsat5数据下载1985年中国地区
  19. 服务器主板最多插多少块CPU,双路主板能不能只用一块CPU
  20. 关于 ProgPoW:来自芯片工程师的观点

热门文章

  1. jupyter notebook / jupyter lab 深色主题下如何设置字体 及 如何设置绘图颜色
  2. 【IOS】最简单方式实现跑马灯文字效果
  3. 软件工程课程设计——技术栈【Go+Vue+PGSQL】的人事管理系统
  4. 3108 小明爱换钱
  5. 江阴学院计算机科学系logo,江阴中专校徽暨校标(LOGO)征集活动
  6. SM2加密解决java与iOS端加解密不配套问题
  7. OPT3001DNPR人眼响应数字环境光照传感器
  8. 常用日期时间处理类封装DateTime(基于Carbon)
  9. 中国企业如何玩转海外媒体推广?
  10. 2023北京旅行计划 2023带父母北京旅行计划