1 ZooKeeper简介

  1. 官网https://zookeeper.apache.org/
  2. 是协调分布式应用程序而产生的一种分布式的协调服务。ZK暴露给客户端一组简单的原语,客户端可以通过它们实现同步、配置维护等功能
  3. ZK比redis实现分布式锁更方便
  4. ZK中数据保存在内存中,因此性能非常高

1.1 ZK的命名空间与数据结构

  1. ZK的命名空间类似标准文件系统中的命名空间。每个数据节点都由路径来标识,这样就保证了同名数据可以在不同位置上存储多份
  2. 但和标准文件系统不同的是,ZK命名空间中的节点可以存放数据和子节点,就好像文件系统的某个节点既是文件也是目录
  3. 我们通常使用znode表示ZK中的数据节点,每个znode最多存放1M的数据
  4. 不要将ZK当作数据库使用,因为整个ZK集群中,只有Leader提供写入数据的功能,大量的数据写入会导致需要的网络带宽增加,造成数据延时,影响ZK提供协调服务的能力

1.2 节点、临时(ephemeral)节点

  1. 客户端连接ZK后,ZK会为其创建一个session对象,临时节点会存放在session对象中,当出现客户端与服务端之间断开的情况,客户端会自动故障转移到其他服务器,这种情况下session对象不会消失。但如果出现会话超时、权限检查或是客户端主动退出程序等情况,会话就会直接关闭,临时节点也就跟着消失
  2. 由于该特性,ZK实现分布式锁就不需要像Redis那样麻烦,不需要设置过期时间,也不需要建立守护进程监控客户端状态
  3. 数据节点中最多放1M的数据,且二进制安全,即外界客户端推送什么字节数组,服务端就存放什么字节数组

1.3 ZK的工作模式

  1. ZK是以分布式的方式进行工作的,集群中主称为leader,从称为follower
  2. 当连接follower的客户端写入时,follower会统一转发给leader进行写入,然后leader再将写入的数据同步给所有follower。即写永远发生在leader中,而读可以发生在follower上,ZK自动实现了读写分离
  3. ZK集群分为两种运行时状态,可用和不可用。当leader宕机,整个服务就变为了不可用状态,但ZK可以快速的重新恢复出一个leader出来,默认时间为200ms以内

1.4 ZK的特性

  1. 顺序一致性:所有写入都通过leader,leader上维护了一个序列,可以保证命令按客户端发送的顺序执行
  2. 原子性:写入命令要么在所有服务器上都执行成功,要么都执行失败
  3. 单个系统镜像:客户端无论连接ZK集群中哪台服务器,看到的服务视图都是相同的,即使客户端故障转移到具有相同会话的其他服务器,客户端也永远不会看到系统的旧视图
  4. 可靠性:数据会被持久化,不会丢失
  5. 及时性:follower可以在一定时间范围内就完成对leader的数据同步
  6. 高可用:ZK本身就完成了哨兵的功能,leader宕机后可以快速选举出新leader从而恢复整个集群功能,而redis自身不是高可用的,需要借助哨兵实现高可用
  7. 高性能

2 ZK的安装

#1. 在所有服务器上编写/etc/hosts文件,这样ZK启动时才能识别下面的配置文件中配置的node01、node02、node03、node04的ip都是多少
vi /etc/hosts
192.168.246.128 node01
192.168.246.129 node02
192.168.246.130 node03
192.168.246.131 node04
#2. zk官网--Download--选最新版本--HTTP中选择一个链接右键复制链接地址
#3. 进入/opt/mashibing,下载并解压
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
tar xf apache-zookeeper-3.6.1-bin.tar.gz
#4. 下载jdk7并解压,注意直接使用wget命令下载的内容不好用,需要到oracle官网下载后上传给linux
#5. 在所有服务器上配置环境变量
vi /etc/profile
export JAVA_HOME=/opt/mashibing/jdk1.8.0_261
export ZOOKEEPER_HOME=/opt/mashibing/apache-zookeeper-3.6.1-bin
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin
#a. 加载配置文件到内存
source /etc/profile
. /etc/profile
#b. 配置成功后,输入java和zk + tab能显示相关命令
#c. 将配置文件分发,这之前需要在node01的/etc/hosts中添加node02、node03的地址。回车后需要根据提示输入对方密码
scp /etc/profile node02:/etc
scp /etc/profile node03:/etc
scp /etc/profile node04:/etc
#d. 可以在xshell下面写source /etc/profile,然后选全部会话,回车后,就会发送到所有你开启的机器上
#6. 编写配置文件zoo.cfg
cd /opt/mashibing/apache-zookeeper-3.6.1-bin/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
#对于ZK集群心跳包是一种周期性的信号,用于检测leader判断follower状态是否正常。通常,机器间会每隔几秒钟发送一次心跳包。 如果follower没有在指时间内接收到心跳包,发送端就认为follower不正常
#a. leader和follower之间每次心跳间隔时间,2000ms
tickTime=2000
#b. follower初始连接leader时,能容忍的最多心跳数
initLimit=10
#c. Leader下发给follower命令时,允许的最多心跳数
syncLimit=5
#d. 持久化地址,最好不要用tmp目录,var在linux中一般负责存放临时数据
dataDir=/var/mashibing/zk
#e. 客户端连接zookeeper使用的端口号
clientPort=2181
#f. 当前ZK服务允许的最大客户端连接数
#maxClientCnxns=60
#g#g1:ZK集群中,当leader宕机后,重新选举leader时,需要过半的follower同意,写操作时,也必须有过半的follower数据和leader数据一致才认为写入成功,这个过半,就是下面server数/2+1得到的,也就是当前集群中,至少有4/2+1=3台follower同意#g2:redis中是通过在master上pubsub,让所有哨兵间互相认识并通信,对于ZK中,没有哨兵的概念,服务器自己就具有哨兵的功能,但由于ZK不支持pubsub,同一集群上服务器互相不认识,因此需要在此手工配置#g3:3888端口用于选举leader,选出leader后,leader自身会开启2888端口,用于接收其他所有follower的写请求#g4:1、2、3、4表示ZK服务的id号,优先选举id值最大的服务为leader。但由于当前配置中,只要有3个服务就能选举出一个leader,因此如果先启动了node01、node02、node03上的ZK服务,那么node03就会被推选为leader,当node04上ZK再启动,也还是follower#g5:根据redis中讲解过的哨兵个数的选择,一般此处应配置奇数台服务,此处为偶数是为了演示选leader的过程
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
server.4=node04:2888:3888
#7. 在所有服务器上创建持久化目录,并在其内创建文件myid,写入ZK服务的id号
mkdir -p /var/mashibing/zk
cd /var/mashibing/zk
vi myid
#因为上面配置中node01对应server.1,所以此处配置文件中写入1
1
#8. 将node01中配置好的内容,分配给其他几个节点,因为拷贝整个目录,所以需要路径应以/结尾
scp -r ./mashibing/ node02:`pwd`

3 ZK的使用

  1. 服务器启动

    #1. 帮助文档
    zkServer.sh help
    #2. 默认后台启动,加start-foreground可以前台方式启动,此处为了方便观察实时观察ZK服务日志
    #3. 顺序启动node01、node02、node03上的ZK服务。3台服务器上ZK全启动之前,前两个服务器上的ZK通过zkServer.sh status命令,发现都是not running状态,直到第3个服务启动,才够4/2+1,才选出leader,follower会从leader上获取快照信息,和redis模式相同
    #4. 选出leader后,整个ZK集群才成为可用状态,可以对外提供服务,此时再次使用zkServer.sh status命令,发现node02和node01上ZK变为follower状态
    zkServer.sh start-foreground
    #5. 此时关闭node03上的现在是leader的ZK服务,发现node04上ZK服务自动变为leader
    #6. 但如果经过大量数据写入后再选举leader的时,会先比较谁的数据最完整,从数据最完整的ZK中选取一个id号最大的作为leader
    
  2. 客户端连接

    #1. 默认连接本机上的2181
    zkCli.sh
    #2. 查看支持的命令
    help
    #3. 查看根目下znode(数据节点)
    ls /
    #4. 创建znode,必须紧跟存放的数据内容,否则不会报错,但创建会失败
    #create /ooxx
    create /ooxx "123"
    #5. 查看存放在节点中的数据
    get /ooxx
    #6. 查看存放在节点中的数据,以及该节点的状态信息Stat
    get -s /ooxx
    #a. 数据节点的值
    123
    #b. 由于ZK的顺序一致性,每一个对znode树的写操作,都会被赋予一个全局唯一的ID,称之为zxid(ZooKeeper Transaction ID)。该值在leader上会维护,从1开始,每次递增1,cZxid就是创建当前数据节点时的zxid
    #c. 0x表示后面是一个16进制的数,16进制的数,每个数字可以转换为4位2进制数,将事务号转为2进制,后32位中4表示这是leader上的第4个指令,前4位为2,表示leader的纪元,表示这是整个ZK集群中推举出的第2任leader所记录的zxid
    #d. 每当客户端连接,就会在某server上创建了session对象,为保证所有服务器上拥有相同session对象,该session会被写入到leader,然后自动同步到其他所有服务器上,因此会消耗zxid,
    cZxid = 0x200000004
    #c. 创建时间
    ctime = Sun Jul 19 22:47:23 PDT 2020
    #d. 最近一次修改该数据节点的zxid
    mZxid = 0x200000011
    #e. 最近一次修改该数据节点的时间
    mtime = Sun Jul 19 23:28:33 PDT 2020
    #f. 当前节点下,创建的最后一个子节点的zxid,例:create /ooxx/xxoo ""的zxid
    pZxid = 0x200000006
    cversion = 1
    dataVersion = 3
    aclVersion = 0
    #g. 创建临时节点的客户端的session id,如果该节点为临时节点,此属性就会有值
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 1
    #7. 创建序列节点
    #a. -s表示sequence,创建节点时,为节点名后自动拼接一个序列号
    #b. 会自动创建/abc0000000000,其他客户端执行该命令后,自动创建/abc0000000001,可以防止多个客户端创建同一节点时,后面客户端将前面客户端数据节点覆盖
    #c. 通过该命令返回值就能拿到具体创建的数据节点的名称
    #d. 如果通过rmr /abc/xxx000000000001,删除节点,之后再创建,实际上还是会从2开始,不会从1开始
    create -s /abc "dfdf"
    #8. 创建临时节点。创建临时节点的客户端退出后,该客户端对应的session消失,临时节点也跟着消失
    create -e /xoxo "ooodkjfj"
    #9. 为已存在节点,更新数据
    set /abc "2345"
    #10. 删除没有子节点的节点
    delete /xoxo
    #11. 删除拥有子节点的节点,注意每个版本客户端命令并不相同,视频中版本删除为rmr,但3.6.1改为deleteall
    deleteall /xoxo
    

4 ZK的实际应用

需要client代码实现,ZK只是提供功能支持

  1. 配置管理:多台客户端共享同一配置文件,通过访问同一节点实现
  2. 统一命名服务:分布式系统中,经常需要给一个资源生成一个唯一的ID,通过序列节点实现
  3. 分布式锁:通过临时节点实现,且可以通过临时序列节点来完成队列式或事物式的锁模型
  4. 分组管理:通过目录一样的数据结构实现
  5. 集群管理:可以进行选主

5 ZK服务上的连接

#1. egrep表示开启正则表达式,后面表示查找2888或3888
#2. 分别查看node1、node02、node03上的监听
#a. node01:由于不是leader,所以启动一个3888端口监听,node02和node03上各启动一个随机端口,连接3888,同时自身启动一个随机端口连接leader的2888
#b. node02:启动一个3888端口监听,由于node02和node01已经相连,所以只在node03上启动一个随机端口与node02的3888相连。同时身启动一个随机端口连接leader的2888
#c. node03:是leader,所以启动2888和3888这两个端口作为监听,由于已经启动了随机端口与node01和node02相连,因此其自身的3888不再有人与其连接
#d. 总的来说就是每两个ZK之间都会通过一台机器上的3888接口和另一台机器上的随机端口相连,用于选举leader。而每个follower中又会有随机端口和leader的2888相连,用于向leader发送write请求
#总结
netstat -natp|egrep '(2888|3888)'

6 observer

  1. observer:和leader、follower概念并列,作为observer的服务器,不能参与投票,只提供读取数据功能

  2. ZK集群中,follower太多会导致选举速度变慢,通常包含1个leader,个位数、奇数个follower,以及大量的observer

  3. 通过zoo.cfg配置文件将某个ZK服务指定为observer

    server.4=node04:2888:3888:observer
    

7 Paxos算法与ZAB协议

7.1 Paxos算法

  1. 基于消息传递通信模型的分布式系统,不可避免发生进程慢、被杀死、重启、消息延迟、丢失、重复等问题
  2. Paxos 算法解决的问题是在一个可能发生上述异常的分布式系统中如何就某个值达成一致,保证不论发生以上任何异常,都不会破坏决议的共识
  3. Paxos算法的前提是分布式系统中不存在拜占庭将军问题(即绝对不会传递错误的消息)
  4. 在百度中,可以指定在某个网站内搜索内容,例:paxos site:douban.com

7.2 ZAB协议

  1. ZooKeeper的原子广播协议,是Paxos算法的一个简易实现
  2. 原子:当产生数据修改,ZK集群中所有节点,要么全部成功,要么全部失败,没有中间状态。基于队列+2阶段提交实现
  3. 广播:不代表所有人都能听到,强调过半通过即可
  4. ZAB协议只作用在整个ZK集群可用的状态下,即leader存在的情况下,leader宕机,整个ZK集群进入不可用状态,直到选出新的leader之前,所有节点不再提供服务。这是因为没有leader就没法保证自身数据最新,且由于无法接收leader同步的数据,无法达到最终一致性
  5. 流程
    1. client发送给follower创建ooxx节点的指令
    2. follower将该指令发送给leader
    3. leader生成一个zxid
  6. 两阶段提交
    1. leader与每个follower都维护了一个队列
    2. 第一阶段:leader通过队列,按顺序,要求所有follower写日志记录创建节点这个动作。ZK的数据存放在内存,而日志存放在硬盘。写日志成功后,follower返回ok给leader,此时已经有leader、回复ok的这个follower两个人同意了这个方案,超过了半数
    3. 第二阶段:leader通过队列,按顺序,要求所有follower将命令真正写入内存
    4. 这样只要其他follower最终通过队列执行完了所有leader发出的指令,就能实现最终一致性
    5. leader返回ok给follower,表示创建节点成功
    6. follower返回ok给客户端

  1. client从follower上读取的数据不一定最新,但可以使用sync指令,强制同步leader队列中的数据,从而保证获取到的数据一定是最新的。该方法不会阻塞,当同步完后将最新结果返回给回调方法,并调用回调方法

8 ZK集群选主

  1. 只要能存在某zxid,之前一定有过半的ZK持有这个zxid所产生的数据,而既然还能选出主,一定存在过半的ZK还活着,那么就说明,活着的follower中,一定有ZK有之前最大的zxid,且存放着之前最大的那个zxid所产生的数据
  2. 假设现有3台follower
    1. node01:myid为1,zxid为8
    2. node02:myid为2,zxid为8
    3. node03:myid为3,zxid为7
  3. 第一次启动集群流程
    1. 只要启动的节点数超过一半,直接选当前最大的myid对应的节点为leader
  4. leader宕机重选leader流程
    1. node03率先发现作为leader的node04宕机,于是发起投票,由于此时node03不知道其他人信息,所以他会告诉node01和node02,自己给自己也就是node03投出一票
    2. 此时node02和node01接收到node03的通知,先对比自身与node03的zxid,如果相同,再比较myid,然后告诉其他所有节点(包括node03),自己投票给自身当前记录中zxid、myid最大的节点
    3. 每当自身收到新的投票,就检查自身当前记录中zxid、myid最大的节点,是否因为这个新的投票而导致更新,如果需要更新,就告知给其他所有节点
    4. 几轮过后,所有节点最终都会投票给zxid、myid最大的node02,此时对于所有节点的当前记录中,都是node02满票通过
    5. 所有节点检查自身是否为全票通过的这个新leader,如果是,启动自身2888端口,如果不是启动一个随机端口连向新leader的2888,随后leader会把自身zxid与follower的zxid相差的内容的日志,传递给各个follower
    6. 本质上就是只要任何人发起投票,就会触发那个zxid、myid最大的节点发起对自己的投票,最后所有节点对选举的新leader达成一致
  5. 实验
    1. 后台启动node01、node02、node03:zkServer.sh start,查看各节点状态 zkServer.sh status发现node03为leader,因为node03的myid最大
    2. 关闭node03上的ZKzkServer.sh stop,发现node02变为了leader,这是因为zxid前4位为大版本号,node02参与过第一任leader的统治,大版本号为1,而node04的大版本号还是0,且node02的myid大于node01的myid,因此是node02变为leader

9 watch

  1. 可以用来协调两个不同的服务,比如图中serverB需要使用serverA提供的服务,可以在serverB中记录serverA的ip地址与端口,然后远程调用serverA的方法,但serverB无法知道什么时候serverA会挂,serverA即使挂了,serverB仍然会调用serverA,当没有ZK时,可以在serverB和serverA上建立心跳,每隔一段时间去探测serverA是否还活着
  2. 但心跳会有一定的时间延迟,例如3s一次心跳,但刚心跳完serverA就宕机了,此后接近3s内,serverB都无法发现serverA宕机,因此可以使用ZK中的watch解决这个问题
  3. 流程
    1. serverA创建一个临时节点/ooxx/a存放于session中
    2. serverB通过get /ooxx/a得到serverA的ip地址等信息
    3. serverB在/ooxx/a数据节点上添加watch监控,并且注册回调方法,这样当数据节点上发生事件,比如create、delete、change、children,那么ZK就会回调之前watch中注册的回调方法
    4. 此时一旦serverA宕机,其创建的临时节点就会消失,那么就会立即触发delete事件,此时ZK就会回调watch中注册的方法告知serverB,serverA已经宕机的信息

10 API简单使用

  1. 创建maven项目–选择org.apache.maven.archetypes:maven-archetype-quickstart

  2. 添加pom依赖,注意对于zookeeper,选择的版本必须和服务端版本一致,否则会出现问题,此处选3.6.1

  3. 添加log4j配置文件

    1. 右键main–New–Directory–resources
    2. 右键resources–Mark Directory as–Test Resources Root
    3. cp log4j.properties /mnt/hgfs/wusihan
  4. App.java

    package com.msb.zookeeper;import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;import java.util.concurrent.CountDownLatch;/*** Hello world!*/
    public class App {public static void main(String[] args) throws Exception {System.out.println("Hello World!");final CountDownLatch cd = new CountDownLatch(1);//1. ZK中只有session的概念,没有连接池的概念,下面语句会在ZK中创建一个session//2. connectString:给出ZK集群中所有节点的ip和端口,会随机连接其中一个//3. sessionTimeout:session的超时时间,3000ms//a. 服务器和客户端之间维持的是一个长连接,客户端会定时向服务器发送heart_beat,以确定客户端是否正常连接//b. 接收到心跳反馈后,服务器重置下次sessionTimeout时间。每次心跳间隔时间在配置文件中定义//c. 如果在sessionTimeout时间内服务端都没接收到客户端的心跳反馈,server就将该session设置为过期,并删除session创建的临时节点,此后客户端再想使用这个过期的session,会报错KeeperErrorCode = ConnectionLoss//d. 打断点调试过程中或直接停止客户端,会导致客户端无法接收心跳,从而导致session过期,为防止这种情况,应将该值设置大一些//e. 该值不能无限增大,ZK服务器端对会话超时时间是有限制的,主要是minSessionTimeout和maxSessionTimeout这两个参数//4. watcher:分为两类//a. 创建ZK时传入的watcher,为default watcher,用于监控session,主要用于监控client之前连接的server宕机后,重新连接到其他server中的过程// session进入不同状态时,会回调default watcher中的process方法,且每次回调后还会自动重新注册,继续监控session//b. 调用读类型操作时传入的watcher,用于监控数据节点,数据节点发生变化后,回调其process方法,且回调一次后就失效,不会再次自动注册final ZooKeeper zk = new ZooKeeper("192.168.246.128:2181,192.168.246.129:2181,192.168.246.130:2181,192.168.246.131:2181",3000, new Watcher() {@Overridepublic void process(WatchedEvent event) {//state表示session的状态Event.KeeperState state = event.getState();//type表示事件的类型Event.EventType type = event.getType();//path表示在哪个路径String path = event.getPath();System.out.println("new zk watch: " + event.toString());switch (state) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:System.out.println("connected");cd.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;}switch (type) {case None:break;case NodeCreated:break;case NodeDeleted:break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}});//防止session还没进入SyncConnected状态,就调用zk对象的create方法,从而导致创建节点失败cd.await();ZooKeeper.States state = zk.getState();//如果不执行cd.await(),state很有可能是CONNECTING而不是CONNECTEDswitch (state) {case CONNECTING:System.out.println("ing......");break;case ASSOCIATING:break;case CONNECTED:System.out.println("ed........");break;case CONNECTEDREADONLY:break;case CLOSED:break;case AUTH_FAILED:break;case NOT_CONNECTED:break;}//1. 写类型操作有两种api//a. 同步阻塞:写入成功前一直阻塞,返回创建的节点名名称//b. 异步回调:需要传入回调函数,返回值为void//2. 此处使用同步阻塞方法,分别传入数据节点路径、数据、权限等级(无限制)、节点类型(临时节点)//3. 写类型的操作中不能加入watch,watch的注册只发生在读类型调用中,例如get、exists等String pathName = zk.create("/ooxx", "olddata".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);final Stat stat = new Stat();//1. 读类型操作也有两种api,同步阻塞与异步回调,异步回调方法返回值为void,且需要提供回调函数,此处使用同步方法//2. 读操作可以传入一个Watcher对象,用于监控/ooxx数据节点,一旦该节点发生变化,就会回调Watcher中的process方法。//3. 传入Watcher的位置也可以传入一个bollean值,如果为false表示不使用Watcher监控该节点,如果为true,使用默认的Watcher,也就是创建ZK时放入的那个Watcherbyte[] node = zk.getData("/ooxx", new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println("getData watch: " + event.toString());try {//1. 当下面第一次调用setData方法后,会触发回调,但调用读类型的操作的回调后,当前Watcher就不再继续监听该节点,如果需要继续监听,应重新注册//true   default Watch  被重新注册   new zk的那个watch//2. 使用当前Watcher对象重新监听/ooxx节点,也可以通过zk.getData("/ooxx",true,stat);使用默认的Watcher监听/ooxxzk.getData("/ooxx", this, stat);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}, stat);System.out.println(new String(node));//修改/ooxx的值,会触发/ooxx上的Watcher的process方法的回调,最后的0表示版本号Stat stat1 = zk.setData("/ooxx", "newdata".getBytes(), 0);//再次修改/ooxx的值,由于上面代码在第一次触发回调后,在回调中又注册了Watcher,所以此处又会触发回调Stat stat2 = zk.setData("/ooxx", "newdata01".getBytes(), stat1.getVersion());//1. 执行main方法时日志中会打印产生的sessionid、连接的follower,通过zkCli.sh发现临时节点的sessionid和这个相同//2. 关闭当前客户端连接的follower,zkServer.sh stop//3. 之前创建的ZK对象时注册了默认的Watcher,该Watcher可以监控session上发生的事件,通过日志发现客户端会将连接切换到其他机器上,且sessionid值不变System.out.println("-------async start----------");//1. 读操作使用异步回调的方式:getData返回值为void,获取到数据节点的值后,回调AsyncCallback.DataCallback的processResult方法//2. 会先打印start,然后over,最后打印回调zk.getData("/ooxx", false, new AsyncCallback.DataCallback() {@Override//状态吗、节点、 传入的那个“abc”、 数据值、元数据值public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {System.out.println("-------async call back----------");System.out.println(ctx.toString());System.out.println(new String(data));}}, "abc");System.out.println("-------async over----------");//防止程序直接退出而测试不到follower宕机的情况Thread.sleep(2222222);}
    }
    

11 zookeeper实际应用

  1. 正常来说以下功能都不需要人为实现,都使用现成功能的API

11.1 配置管理

  1. 分布式配置:分布式集群中,有些配置文件在每台机器上都是相同的,每次需要修改配置时都需要依次在所有机器上修改,非常麻烦,希望有一种技术可以做到统一访问一个位置,且当配置改动,所有机器中都可以生效。如果利用数据库或redis实现,那么每隔一段时间就需要需要重新扫描一下数据库或redis,保证内存中配置和数据库中最新内容一致。因此可以考虑使用使用ZK实现,用watch监控配置所在数据节点, 一旦数据节点修改,就可以回调Watcher中注册的方法,根据最新的数据节点的值更新内存中作为配置的数据

  2. Dubbo中就是使用ZK实现的注册中心

  3. DefaultWatch:封装Watcher,提供CountDownLatch功能,保证ZK的session进入SyncConnected状态前不允许使用

    package com.msb.zookeeper.config;import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;import java.util.concurrent.CountDownLatch;public class DefaultWatch implements Watcher {CountDownLatch cc;public void setCc(CountDownLatch cc) {this.cc = cc;}@Overridepublic void process(WatchedEvent event) {System.out.println(event.toString());switch (event.getState()) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:cc.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;}}
    }
    
  4. ZKUtils:便捷地提供session状态为SyncConnected的ZK对象

    package com.msb.zookeeper.config;import org.apache.zookeeper.ZooKeeper;import java.util.concurrent.CountDownLatch;public class ZKUtils {private static ZooKeeper zk;//指定这个ZK对象的根路径为/testConf,也就是所有这个ZK对象访问的数据节点前,都是相对/testConf的路径private static String address = "192.168.246.128:2181,192.168.246.129:2181,192.168.246.130:2181,192.168.246.131:2181";private static DefaultWatch watch = new DefaultWatch();//防止zk还没建立完连接就被使用private static CountDownLatch init = new CountDownLatch(1);public static ZooKeeper getZK() {try {zk = new ZooKeeper(address, 3000, watch);watch.setCc(init);init.await();} catch (Exception e) {e.printStackTrace();}return zk;}
    }
    
  5. MyConf

    package com.msb.zookeeper.config;//从ZK中读取到的配置文件在JVM中存放的位置,用conf字符串代表企业中真正使用的配置内容
    public class MyConf {private String conf;public String getConf() {return conf;}public void setConf(String conf) {this.conf = conf;}
    }
    
  6. WatchCallBack

    package com.msb.zookeeper.config;import org.apache.zookeeper.AsyncCallback;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;import java.util.concurrent.CountDownLatch;//1. 可以在需要传入Watcher、exists方法的回调函数、get方法的回调函数时,都统一使用这一个WatchCallBack对象
    public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {ZooKeeper zk;MyConf conf;CountDownLatch cc = new CountDownLatch(1);public MyConf getConf() {return conf;}public void setConf(MyConf conf) {this.conf = conf;}public ZooKeeper getZk() {return zk;}public void setZk(ZooKeeper zk) {this.zk = zk;}//1. /testConf/AppConf如果不存在,就一直阻塞,如果存在获取数据,放入confpublic void aWait() {zk.exists("/AppConf", this, this, "ABC");try {cc.await();} catch (InterruptedException e) {e.printStackTrace();}}//getData方法获取到数据后使用的回调,data为数据节点值@Overridepublic void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {if (data != null) {String s = new String(data);conf.setConf(s);cc.countDown();}}//exists方法获取到结果后的回调,没有返回结果,只有stat,如果stat不为null,就表示数据节点存在@Overridepublic void processResult(int rc, String path, Object ctx, Stat stat) {if (stat != null) {//由于创建zk时,指定根是/testLock,所以这个/AppConf是/testLock/AppConfzk.getData("/AppConf", this, this, "sdfs");}}//Watcher监控的节点产生事件后进行的回调@Overridepublic void process(WatchedEvent event) {//获取事件类型switch (event.getType()) {case None:break;case NodeCreated://1. 当第一次调用TestConfig的getConf方法,如果数据节点不存在,getData方法的回调中,不会走cc.countDown()所在分支,因此TestConifg的getConf会一直阻塞//2. 所以一旦数据创建,应马上调用getData,从而重新触发getData的回调,这样才能释放锁,并为MyConf赋值zk.getData("/AppConf", this, this, "sdfs");break;case NodeDeleted://1. 如果数据节点被删除时的处理,如果你认为这可能是误删,不应该处理conf中的值,那么就不用调用下方逻辑//2. 如果你认为应该清空之前读取到的配置文件,就要进行如下处理conf.setConf("");cc = new CountDownLatch(1);break;case NodeDataChanged://一旦数据节点的值变化,调用getData,触发getData的回调,从而将更新后数据放入MyConf对象中zk.getData("/AppConf", this, this, "sdfs");break;case NodeChildrenChanged:break;}}
    }
    
  7. TestConfig:多线程下调试会有问题,aWait方法会一直阻塞,推测是因为客户端由于debug而阻塞,无法正确接收回调导致

    package com.msb.zookeeper.config;import org.apache.zookeeper.ZooKeeper;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;public class TestConfig {ZooKeeper zk;@Beforepublic void conn() {zk = ZKUtils.getZK();}@Afterpublic void close() {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void getConf() {WatchCallBack watchCallBack = new WatchCallBack();watchCallBack.setZk(zk);MyConf myConf = new MyConf();watchCallBack.setConf(myConf);//该方法用于从ZK的/testConf/AppConf数据节点中读取内容存放到myConf中,且如果数据不存在就阻塞watchCallBack.aWait();//数据节点存在会跑到下面//此处while循环是为了方便随时通过zkCli.sh修改数据节点值来观察结果while (true) {//如果数据节点被删除if (myConf.getConf().equals("")) {System.out.println("conf diu le ......");//1. 相当于MyConf配置文件没初始化成功,因此不应该继续执行//2. 但之前设置值的时候,CountDownLatch已经被减过1,所以里面await不生效,因此需要Watcher在监控到删除节点的事件时,重新上一把锁watchCallBack.aWait();} else {System.out.println(myConf.getConf());}try {//此处是为了防止打印太快,不方便测试观察Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}
    }
    
  8. 实验流程

    1. 由于人为指定了zk的根节点为/testConf,所以需要先通过zkCli.sh连接ZK,并创建该节点
    2. 启动TestConfig,此时阻塞
    3. 创建/testConf/AppConf,此时循环打印该子节点值
    4. 修改子节点,此时循环打印新值

11.2 分布式锁

  1. 几种分布式锁的实现方案

    1. 方案一:主动轮巡,发送心跳包监控其他机器上的锁是否被释放了。会有延迟、压力(例如900个机器都在轮巡)
    2. 方案二:使用watch+临时节点,通过回调解决延迟的问题。但仍然会有压力问题,ZK会回调999个客户端,通信上有压力,且这999个还需要新一轮抢锁
    3. 方案三:使用watch+临时的序列节点
      1. 1000个客户端都创建一个临时的序列节点
      2. 判断如果自身是当前父目录下最小的节点,就继续进行,业务逻辑执行成功后,删除当前节点
      3. 如果不是最小的节点,监控比自身小1的那个节点,当那个节点消失,自身就可以继续向下执行,抢锁的成本仅仅是ZK触发前一个节点消失时的回调的成本。注意不能监控父节点,因为触发children事件后,还是会回调所有人造成压力
  2. WatchCallBack:实现分布式加锁和解锁的逻辑

    package com.msb.zookeeper.lock;import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {ZooKeeper zk;String threadName;CountDownLatch cc = new CountDownLatch(1);String pathName;public String getPathName() {return pathName;}public void setPathName(String pathName) {this.pathName = pathName;}public String getThreadName() {return threadName;}public void setThreadName(String threadName) {this.threadName = threadName;}public ZooKeeper getZk() {return zk;}public void setZk(ZooKeeper zk) {this.zk = zk;}//加一个分布式锁public void tryLock() {try {//创建一个临时的序列节点,创建后触发回调zk.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "abc");//如果自身创建的临时序列节点是整个父目录下最小的那个才继续执行,否则阻塞cc.await();} catch (InterruptedException e) {e.printStackTrace();}}//create的回调@Overridepublic void processResult(int rc, String path, Object ctx, String name) {//表示节点创建成功,name会是/testLock/lock0000000001这种格式if (name != null) {System.out.println(threadName + "  create node : " + name);pathName = name;//查看/testConf下所有子节点,不需要加watch,因为不是想对整个父目录进行监控,获取到子节点后触发回调,判断自身是否最小,是则继续执行,否则阻塞zk.getChildren("/", false, this, "sdf");}}//getChildren的回调@Overridepublic void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {//回调中是一定能看到自己创建的子节点之前的那些子节点的
    //        System.out.println(threadName+"look locks.....");
    //        for (String child : children) {//            System.out.println(child);
    //        }//list中内容是无序的,想判断自己是不是最小的,需要先排序Collections.sort(children);//pathName是带最后的斜线的,所以需要去掉int i = children.indexOf(pathName.substring(1));//如果自身是所有子节点中最小的,自己测试时,这块一直不对,是因为自己测试时,根目录用了相同的/testConf,而/testConf下有之前建立的AppConf,这导致pathName索引永远不是0,而是1,删除/testConf/AppConf即可if (i == 0) {try {//1. 可以将父节点设置上值,这样在tryLock方法中,可以判断当前线程是否和父节点的值相同//a. 如果相同表示当前线程现在就是该锁的持有者,不用再次创建节点并await了,防止同时加了/lock0000000001和/lock0000000500两把锁导致虽然1和500在同一线程,但500却必须等到前面499个内容释放锁//2. 这样tryLock就变为了一个可重入锁,但此处tryLock中并未完成该逻辑zk.setData("/", threadName.getBytes(), -1);//让自己之前的await阻塞过去cc.countDown();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} else {//a. 如果不是子节点中最小的,继续等待,并监控自身的前一个数据节点。这样一旦前一个节点对应的进程结束,前一个节点消失,就触发Watcher中回调,从而让自身再次能够尝试抢锁//b. 但如果前一个节点业务执行过快,那么可能无法触发Watcher中回调,因此需要加上一个新的回调函数进行处理zk.exists("/" + children.get(i - 1), this, this, "sdf");}}//exists的回调@Overridepublic void processResult(int rc, String path, Object ctx, Stat stat) {zk.getChildren("/", false, this, "sdf");}//释放分布式锁public void unLock() {try {//删除节点即可,-1表示忽略版本判定zk.delete(pathName, -1);System.out.println(threadName + " over work....");} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}//Watcher中的回调@Overridepublic void process(WatchedEvent event) {//当某一个节点删除后,其实只有其后面的一个节点收到了回调事件switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted://前一个节点消失了,需要触发getChildren的回调,才能调用自身的cc.countDown();从而继续执行。只是为了触发回调,因此无需添加Watcherzk.getChildren("/", false, this, "sdf");break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}
    }
    
  3. TestLock:为10个线程去加分布式锁,看是否能实现加锁段代码不会同时执行

    package com.msb.zookeeper.lock;import com.msb.zookeeper.config.ZKUtils;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;public class TestLock {ZooKeeper zk;@Beforepublic void conn() {zk = ZKUtils.getZK();}@Afterpublic void close() {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void lock() {//实际环境中,这十个线程位于不同的服务器上,他们想操作同一种资源,比如数据库。正常如果在同一个JVM中,根本无需分布式锁,使用正常的锁就可以使实现for (int i = 0; i < 10; i++) {new Thread() {@Overridepublic void run() {WatchCallBack watchCallBack = new WatchCallBack();watchCallBack.setZk(zk);//方便在WatchCallBack中打印线程名和锁之间的关系String threadName = Thread.currentThread().getName();watchCallBack.setThreadName(threadName);//每一个线程://抢锁watchCallBack.tryLock();//干活,锁中的代码,多个线程间无法同时执行,因此判断锁生效System.out.println(threadName + " working...");//释放锁watchCallBack.unLock();}}.start();}while (true) {}}
    }
    

12 CAP定理

  1. 在一个分布式系统中,强一致性、可用性、分区容错,三者无法同时满足。且由于分区容错是必须要保证的,因此我们设计分布式系统时,必须在强一致性和可用性之间有所取舍和平衡

12.1 强一致性

  1. 各节点的数据保证一致,每次成功写入之后,无论从哪个节点读取,都能读取到最新数据,相当于向所有节点的写操作整体,是原子操作(要么全部失败要么全部成功)
  2. 一致性分类
    1. 强一致性:写操作完成后,后续的读操作都能看到最新数据
    2. 弱一致性:能容忍部分或全部都看不到最新数据
    3. 最终一致性:经过一段时间后,都能看到最新数据

12.2 可用性

  1. 每次向未崩溃的节点发送请求,总能保证马上收到响应数据,不允许有延时

12.3 分区容错性

  1. 大多数分布式系统都分布在多个子网络,每个子网络就叫做一个区,区之间网络传输出现问题,系统仍然可以正常运行
  2. 分区是大概率发生事件,如果一旦出现网络分区系统就无法提供服务,那么这个系统大部分时间都无法对外提供服务,这是不可能允许的,所以说分区容错性是必须要保证的

12.4 强一致性与可用性的矛盾

  1. 假设此时节点1和节点2网络不通,产生分区,由于我们保证了分区容错性,所以此时系统仍要对外提供服务
  2. 当客户端向节点1发送写请求时,为了保证节点1和节点2的强一致性,节点1必须在写操作完成之前,锁定节点2上的读写操作,当数据同步后,才能重新开放节点2的读写,锁定期间,节点2无法读写
  3. 那么由于网络故障,节点2数据迟迟无法得到同步,那么对节点2进行读操作的客户端,就只能迟迟得不到节点2的返回的数据,这就违背了可用性
  4. 同理如果追求可用性,就无法达到强一致性

12.5 强一致性与可用性取舍

  1. 可用性 > 强一致性:发布网页,多个服务器有这张网页的副本。后来发现一个错误,需要更新网页,这时只能每个服务器都更新一遍,一般来说,网页的更新不是特别强调一致性,短时期内,一些用户拿到老版本,另一些用户拿到新版本,问题不会特别大。当然,所有人最终都会看到新版本

2. Zookeeper相关推荐

  1. 常用的高性能 KV 存储 Redis、Memcached、etcd、Zookeeper 区别

    1. 什么是 KV 存储 KV 是 Key-Value 的缩写,KV 存储也叫键值对存储.简单来说,它是利用 Key 做索引来实现数据的存储.修改.查询和删除功能. 常用的高性能 KV 存储主要有 R ...

  2. kafka+zookeeper搭建步骤kafka问题

    kafka+zookeeper搭建步骤 帅气的名称被占用关注 0.1392018.12.04 13:48:00字数 1,007阅读 88 vmware 安装centOS7 克隆虚拟为:三台 本地你的I ...

  3. ZooKeeper简单使用

    ZooKeeper简单使用 ZooKeeper简单使用 1.ZooKeeper简介 2.ZooKeeper能做什么 3.ZooKeeper核心 3.1.ZooKeeper安装 3.2.ZooKeepe ...

  4. 2021年大数据ZooKeeper(六):ZooKeeper选举机制

    目录 ​​​​​​ZooKeeper选举机制 概念 全新集群选举 非全新集群选举 ZooKeeper选举机制 zookeeper默认的算法是FastLeaderElection,采用投票数大于半数则胜 ...

  5. 2021年大数据ZooKeeper(五):ZooKeeper Java API操作

    目录 ZooKeeper Java API操作 引入maven坐标 节点的操作 ZooKeeper Java API操作 这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端 ...

  6. 2021年大数据ZooKeeper(四):ZooKeeper的shell操作

    目录 ZooKeeper的shell操作 客户端连接 shell基本操作 操作命令 操作实例 节点属性 ​​​​​​​ZooKeeper Watcher(监听机制) ​​​​​​​Watch机制特点 ...

  7. 2021年大数据ZooKeeper(三):Zookeeper数据模型和节点类型

    目录 Apache ZooKeeper Zookeeper数据模型 Zookeeper节点类型 Apache ZooKeeper Zookeeper数据模型 图中的每个节点称为一个Znode. 每个Z ...

  8. 2021年大数据ZooKeeper(二):ZooKeeper集群搭建

    目录 ZooKeeper集群搭建 第一步:下载zookeeeper的压缩包,下载网址如下 第二步:解压 第三步:修改配置文件 第四步:添加myid配置 ​​​​​​​第五步:安装包分发并修改myid的 ...

  9. 2021年大数据ZooKeeper(一):ZooKeeper基本知识

    目录 Zookeeper基本知识 ZooKeeper概述 ZooKeeper特性 ZooKeeper集群角色 Leader: Follower: Observer: Zookeeper基本知识 Zoo ...

  10. ZooKeeper简介和概念知识

    1. 简介 ZooKeeper是一种分布式协调服务,用于管理大型主机.在分布式环境中协调和管理服务是一个复杂的过程. ZooKeeper通过其简单的架构和API解决了这个问题.ZooKeeper允许开 ...

最新文章

  1. 别和 Python 说再见了!!
  2. httpd四之CGI、HTTPS、压缩配置
  3. navigator.geolocation的应用 - 将定位信息显示在百度地图上
  4. Kotlin学习记录1
  5. centos mysql 互为主从_centos7 mysql互为主从+keepalived
  6. vue封装websocket_有关WebSocket必须了解的知识
  7. linux下查看一个进程的启动时间和运行时间
  8. Linux 离线安装软件
  9. 初一计算机第6单元,青岛出版社初中信息技术 七年级下册第二单元 第6课 海报设计 教学设计...
  10. python第三十二天-----算法
  11. RHadoop实践系列文章
  12. 路由器刷机突破校园网限制
  13. 操作系统 进程调度实验报告
  14. java web构建_使用Java构建一个宁静的Web服务
  15. wechat-0051,微信公众号,第三方登录—扫码绑定
  16. 看完小i机器人诉苹果,惊呼这也行?
  17. Spring框架中 自动装配的详解 属性值的详解
  18. 50行Python代码,一键获取微博热点!
  19. 什么是生产管理系统?
  20. Personal views on domain change of several theorems and applications

热门文章

  1. 减少HTTP请求的四种方式
  2. (Python)Numpy矩阵增加/减少一个维度
  3. Altium Designer--如何快速查看PCB网络布线
  4. 软件测试需要学习什么
  5. 【机器学习】算法原理详细推导与实现(七):决策树算法
  6. 最逼近Mac OS的Linux系统 -- Elementary OS
  7. 如何查看谷歌浏览器中保存的密码
  8. 【国际】费城联邦储备银行会议探索区块链对金融稳定的影响
  9. 如何留住优秀的测试人员
  10. 操作系统真象还原实验记录之实验七:加载内核