个人博客:https://suveng.github.io/blog/​​​​​​​

源码:https://gitee.com/suwenguang/test

zookeeper集群角色:

  1. leader 主
  2. follower 从
  3. observer 观察者 :不参与写的选举,但是提供读

概念:

  1. 数据模型

    zookeeper的数据模型和文件系统类似,每一个节点称为:znode. 是zookeeper中的最小数据单元。每一个znode上都可以

    保存数据和挂载子节点。 从而构成一个层次化的属性结构

    节点特性

    持久化节点 : 节点创建后会一直存在zookeeper服务器上,直到主动删除

    持久化有序节点 :每个节点都会为它的一级子节点维护一个顺序

    临时节点 : 临时节点的生命周期和客户端的会话保持一致。当客户端会话失效,该节点自动清理

    临时有序节点 : 在临时节点上多勒一个顺序性特性

  2. 会话

  3. watcherzookeeper提供了分布式数据发布/订阅****,zookeeper****允许客户端向服务器注册一个watcher监听。当服务器端的节点触发指定事件的时候

    **会触发watcher。服务端会向客户端发送一个事件通知****watcher的通知是一次性,一旦触发一次通知后,该watcher就失效

  4. ACL
    zookeeper提供控制节点访问权限的功能,用于有效的保证zookeeper中数据的安全性。避免误操作而导致系统出现重大事故。

    CREATE /READ/WRITE/DELETE/ADMIN

zookeeper的命令操作

  1. create [-s] [-e] path data acl
    -s 表示节点是否有序
    -e 表示是否为临时节点
    默认情况下,是持久化节点

  2. get path [watch]
    获得指定 path的信息

  3. set path data [version]
    修改节点 path对应的data
    乐观锁的概念
    数据库里面有一个 version 字段去控制数据行的版本号

  4. delete path [version]
    删除节点

stat信息

cversion = 0 子节点的版本号
aclVersion = 0 表示acl的版本号,修改节点权限
dataVersion = 1 表示的是当前节点数据的版本号

czxid 节点被创建时的事务ID
mzxid 节点最后一次被更新的事务ID
pzxid 当前节点下的子节点最后一次被修改时的事务ID

ctime = Sat Aug 05 20:48:26 CST 2017
mtime = Sat Aug 05 20:48:50 CST 2017

java API操作zookeeper

demo实例我已经放在gitee上面。在zookeeper_demo的模块下面。

https://gitee.com/suwenguang/test

注意需要导入jar

<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.8</version>
</dependency>
package 操作zookeeper.javaAPI;import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;/*** author Veng Su* email  1344114844@qq.com* date   18-9-16 下午1:19*/
//这里实现watcher借口是为了方便构造zookeeper对象
public class APIdemo implements Watcher {public final static String CONNECTSTRING = "192.168.0.201:2181,192.168.0.203:2181,192.168.0.204:2181";public static CountDownLatch countDownLatch = new CountDownLatch(1);//计数器用于同步连接public static ZooKeeper zooKeeper;//zookeeper对象public static Stat stat = new Stat();//不给的全局,状态会话会丢失,导致执行失败public static void main(String[] args) throws Exception {APIdemo apIdemo = new APIdemo();zooKeeper = new ZooKeeper(CONNECTSTRING, 500, new APIdemo());//创建zookeeper对象countDownLatch.await();System.out.println(zooKeeper.getState());
//注意我都是写死的节点。便于学习
//        System.out.println(zooKeeper.getData("/suveng",apIdemo,stat));
//        apIdemo.delete();//这里是删除节点,注意要有才能删除
//        apIdemo.get(apIdemo);//这里是获取节点
//        apIdemo.create();//创建节点
//        apIdemo.set();//修改节点
//        Thread.sleep(2000);
//
//        Thread.sleep(1000);
//        zooKeeper.create("/suveng/sds", "dddd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
//        System.out.println(zooKeeper.getChildren("/suveng",true));//        权限控制部分:ip,digest,world,super
//        zooKeeper.addAuthInfo("digest","root:root".getBytes());//第一种方式做权限ACL acl=new ACL(ZooDefs.Perms.CREATE, new Id("digest","root:root"));List<ACL> acls=new ArrayList<>();acls.add(acl);zooKeeper.create("/suveng/auth","2122".getBytes(),acls,CreateMode.PERSISTENT);//第二种方式做权限
//        zooKeeper.addAuthInfo("digest","root:root".getBytes());//创建新的客户端APIdemo apIdemo1=new APIdemo();ZooKeeper zooKeeper1=new ZooKeeper(CONNECTSTRING,5000,apIdemo1);zooKeeper1.getData("/suveng/auth",true,new Stat());}private void create() {//createString res = null;try {res = zooKeeper.create("/suveng", "hello,suwenguang".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);} catch (KeeperException | InterruptedException e) {e.printStackTrace();}System.out.println("create " + res);}private String set() throws KeeperException, InterruptedException {zooKeeper.setData("/suveng", "dddd".getBytes(), -1);return null;}private void get(APIdemo apIdemo) throws KeeperException, InterruptedException {zooKeeper.getData("/suveng", apIdemo, stat);}private void delete() throws KeeperException, InterruptedException {zooKeeper.delete("/suveng",-1);}//watcher 的实现方法@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {if (watchedEvent.getType() == Event.EventType.None && watchedEvent.getPath() == null) {countDownLatch.countDown();System.out.println(watchedEvent.getState());} else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {try {System.out.println("path---->" + watchedEvent.getPath() + "  |data change--->" +zooKeeper.getData(watchedEvent.getPath(), true, stat));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} else if (watchedEvent.getType() == Event.EventType.NodeCreated) {try {System.out.println("path---->" + watchedEvent.getPath() + "  |created--->" +zooKeeper.getData(watchedEvent.getPath(), true, stat));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} else if (watchedEvent.getType() == Event.EventType.NodeDeleted) {try {System.out.println("path---->" + watchedEvent.getPath() + "  |data deleted--->" +zooKeeper.getData(watchedEvent.getPath(), true, stat));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {try {System.out.println("path---->" + watchedEvent.getPath() + "  |data child changed--->" +zooKeeper.getData(watchedEvent.getPath(), true, stat));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}}
}

权限控制模式

schema:授权对象

ip : 192.168.1.1
Digest : username:password
world : 开放式的权限控制模式,数据节点的访问权限对所有用户开放。 world:anyone
super :超级用户,可以对zookeeper上的数据节点进行操作

连接状态

KeeperStat.Expired 在一定时间内客户端没有收到服务器的通知, 则认为当前的会话已经过期了。
KeeperStat.Disconnected 断开连接的状态
KeeperStat.SyncConnected 客户端和服务器端在某一个节点上建立连接,并且完成一次version、zxid同步
KeeperStat.authFailed 授权失败

事件类型

NodeCreated 当节点被创建的时候,触发
NodeChildrenChanged 表示子节点被创建、被删除、子节点数据发生变化
NodeDataChanged 节点数据发生变化
NodeDeleted 节点被删除
None 客户端和服务器端连接状态发生变化的时候,事件类型就是None

zkClient 操作zookeeper

注意:需要导入jar

zkClient只是简单的封装了Java的zookeeper API。但是总体比Java的要好用,支持递归创建和递归删除。订阅的时候也比较方便。

实例代码依然是放在了gitee上面。

package 操作zookeeper.zkclientdemo;import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.Test;import java.util.ArrayList;
import java.util.List;/*** author Veng Su* email  1344114844@qq.com* date   18-9-16 下午4:24*/
public class Zkclientdemo {public final static String CONNECTSTRING = "192.168.0.201:2181,192.168.0.203:2181,192.168.0.204:2181";public static ZkClient geetInstance() {return new ZkClient(CONNECTSTRING, 5000);}@Testpublic void testcreate() {ZkClient zkClient = Zkclientdemo.geetInstance();zkClient.createEphemeral("/zktest1");ACL acl=new ACL(ZooDefs.Perms.CREATE, new Id("digest","root:root"));List<ACL> acls=new ArrayList<>();acls.add(acl);zkClient.createEphemeral("/zktest2",acls);zkClient.createEphemeral("/zktest3","324".getBytes(),acls);zkClient.create("/zktest","21".getBytes(), CreateMode.PERSISTENT);//递归创建zkClient.createPersistent("/digui0/digui1/digui2/digui3",true);System.out.println("success");}@Testpublic void testdelete() {ZkClient zkClient = Zkclientdemo.geetInstance();//普通删除zkClient.delete("/zktest");zkClient.delete("/zktest1");zkClient.delete("/zktest2");zkClient.delete("/zktest3");//递归删除zkClient.deleteRecursive("/digui0");boolean is = zkClient.exists("/suveng");System.out.println(is);}@Testpublic void testWatchers() throws InterruptedException {ZkClient zkClient = Zkclientdemo.geetInstance();zkClient.subscribeDataChanges("/suveng", new IZkDataListener() {@Overridepublic void handleDataChange(String s, Object o) throws Exception {System.out.println(s+"->"+o);}@Overridepublic void handleDataDeleted(String s) throws Exception {}});zkClient.writeData("/suveng","suwenguang");Thread.sleep(1000);}}

curator操作zookeeper

curator是netflix公司开源的。提供了各种使用场景的封装

curator-framework 提供了fluent 风格api

curator-replice 提供实现封装

fluent风格介绍。也就是链式操作如下

package 操作zookeeper.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** author Veng Su* email  1344114844@qq.com* date   18-9-16 下午6:32*/
public class CuratorDemo {public final static String CONNECTSTRING = "192.168.0.201:2181,192.168.0.203:2181,192.168.0.204:2181";public static CuratorFramework getInstance() {return CuratorFrameworkFactory.builder().connectString(CONNECTSTRING).sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 5)).build();}//    测试创建@Testpublic void testCreate() throws Exception {CuratorFramework curatorFramework = CuratorDemo.getInstance();curatorFramework.start();curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/suveng/testcurator");}//    测试删除@Testpublic void testDelete() throws Exception {CuratorFramework curatorFramework = CuratorDemo.getInstance();curatorFramework.start();curatorFramework.delete().deletingChildrenIfNeeded()
//                .withVersion().forPath("/suveng/testcurator");}//测试getdata@Testpublic void testGet() throws Exception {CuratorFramework curatorFramework = CuratorDemo.getInstance();curatorFramework.start();Stat stat = new Stat();byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/suveng");System.out.println(new String(bytes));System.out.println("stat->" + stat);}//    测试setdata@Testpublic void testSet() throws Exception {CuratorFramework curatorFramework = CuratorDemo.getInstance();curatorFramework.start();curatorFramework.setData().forPath("/suveng", "knickknack".getBytes());}//测试异步@Testpublic void testasynchronous() throws Exception {CuratorFramework curatorFramework = CuratorDemo.getInstance();curatorFramework.start();ExecutorService executorService = Executors.newFixedThreadPool(1);CountDownLatch countDownLatch = new CountDownLatch(1);//创建临时节点curatorFramework.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {System.out.println(Thread.currentThread().getName() + "这是异步的回调函数" + "   result code->" + curatorEvent.getResultCode() + "" +"curator type" + curatorEvent.getType());countDownLatch.countDown();}}, executorService).forPath("/suveng/linshi");//不给其他线程默认是当前线程。countDownLatch.await();System.out.println(Thread.currentThread().getName() + "main线程");executorService.shutdown();}//事务@Testpublic void testTransaction() throws Exception {CuratorFramework curatorFramework = CuratorDemo.getInstance();curatorFramework.start();Collection<CuratorTransactionResult> curatorTransactions = curatorFramework.inTransaction().create().withMode(CreateMode.EPHEMERAL).forPath("/suveng/transaction").and().setData().forPath("/suveng", "sdfsdfsd".getBytes()).and().commit();for (CuratorTransactionResult result : curatorTransactions) {System.out.println(result.getForPath() + "->" + result.getType());}}//    测试watcher
//    patchcache 监视一个节点的子节点下的创建,删除,更新
//    nodecache 监视一个节点的创建,删除,更新
//    Treecache     patchcache + nodecache 合体 监视路径下的创建,删除,更新,并且缓存路径 所有子节点的数据@Testpublic void testWatcher() throws Exception {CuratorFramework curatorFramework = CuratorDemo.getInstance();curatorFramework.start();//        NodeCache nodeCache=new NodeCache(curatorFramework,"/suveng",false);
//        nodeCache.start();
//
//        nodeCache.getListenable().addListener(()-> System.out.println("节点数据发生改变,改变后的数据"+new String(nodeCache.getCurrentData().getData())));
//        curatorFramework.setData().forPath("/suveng","sdfasfasfda".getBytes());
//        Thread.sleep(2000);PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, "/suveng", true);childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);childrenCache.getListenable().addListener((curatorFramework1, pathChildrenCacheEvent) -> {switch (pathChildrenCacheEvent.getType()) {case CONNECTION_RECONNECTED:System.out.println("reconnetc");break;case CHILD_ADDED:System.out.println("child add");break;case CHILD_REMOVED:System.out.println("child remove");break;case CHILD_UPDATED:System.out.println("child update");break;default:break;}});//        curatorFramework.setData().forPath("/suveng","sdfsd".getBytes());//        curatorFramework.delete().forPath("/suveng/test");curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/suveng/test");TimeUnit.SECONDS.sleep(1);curatorFramework.setData().forPath("/suveng/test","sfs".getBytes());TimeUnit.SECONDS.sleep(1);}@Testpublic void testInit() {
//        创建会话的两种方式
//        第一种 normalCuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECTSTRING, 5000, 5000, new ExponentialBackoffRetry(1000, 3));curatorFramework.start();
//        第二种 fluentCuratorFramework curatorFramework2 = CuratorFrameworkFactory.builder().connectString(CONNECTSTRING).sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 5)).build();curatorFramework2.start();System.out.println("success ");}
}
private void makeFluent(Customer customer) {customer.newOrder().with(6, "TAL").with(5, "HPK").skippable().with(3, "LGV").priorityRush();
}

curator的重试策略

ExponentialBackoffRetry() 衰减重试
RetryNTimes 指定最大重试次数
RetryOneTime 仅重试一次
RetryUnitilElapsed 一直重试知道规定的时间

zookeeper使用相关推荐

  1. 常用的高性能 KV 存储 Redis、Memcached、etcd、Zookeeper 区别

    1. 什么是 KV 存储 KV 是 Key-Value 的缩写,KV 存储也叫键值对存储.简单来说,它是利用 Key 做索引来实现数据的存储.修改.查询和删除功能. 常用的高性能 KV 存储主要有 R ...

  2. kafka+zookeeper搭建步骤kafka问题

    kafka+zookeeper搭建步骤 帅气的名称被占用关注 0.1392018.12.04 13:48:00字数 1,007阅读 88 vmware 安装centOS7 克隆虚拟为:三台 本地你的I ...

  3. ZooKeeper简单使用

    ZooKeeper简单使用 ZooKeeper简单使用 1.ZooKeeper简介 2.ZooKeeper能做什么 3.ZooKeeper核心 3.1.ZooKeeper安装 3.2.ZooKeepe ...

  4. 2021年大数据ZooKeeper(六):ZooKeeper选举机制

    目录 ​​​​​​ZooKeeper选举机制 概念 全新集群选举 非全新集群选举 ZooKeeper选举机制 zookeeper默认的算法是FastLeaderElection,采用投票数大于半数则胜 ...

  5. 2021年大数据ZooKeeper(五):ZooKeeper Java API操作

    目录 ZooKeeper Java API操作 引入maven坐标 节点的操作 ZooKeeper Java API操作 这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端 ...

  6. 2021年大数据ZooKeeper(四):ZooKeeper的shell操作

    目录 ZooKeeper的shell操作 客户端连接 shell基本操作 操作命令 操作实例 节点属性 ​​​​​​​ZooKeeper Watcher(监听机制) ​​​​​​​Watch机制特点 ...

  7. 2021年大数据ZooKeeper(三):Zookeeper数据模型和节点类型

    目录 Apache ZooKeeper Zookeeper数据模型 Zookeeper节点类型 Apache ZooKeeper Zookeeper数据模型 图中的每个节点称为一个Znode. 每个Z ...

  8. 2021年大数据ZooKeeper(二):ZooKeeper集群搭建

    目录 ZooKeeper集群搭建 第一步:下载zookeeeper的压缩包,下载网址如下 第二步:解压 第三步:修改配置文件 第四步:添加myid配置 ​​​​​​​第五步:安装包分发并修改myid的 ...

  9. 2021年大数据ZooKeeper(一):ZooKeeper基本知识

    目录 Zookeeper基本知识 ZooKeeper概述 ZooKeeper特性 ZooKeeper集群角色 Leader: Follower: Observer: Zookeeper基本知识 Zoo ...

  10. ZooKeeper简介和概念知识

    1. 简介 ZooKeeper是一种分布式协调服务,用于管理大型主机.在分布式环境中协调和管理服务是一个复杂的过程. ZooKeeper通过其简单的架构和API解决了这个问题.ZooKeeper允许开 ...

最新文章

  1. 【vuejs深入三】vue源码解析之二 htmlParse解析器的实现
  2. 利用素数表快速寻找 n 以内的所有素数
  3. java定义基础变量语句_编程语言第一:JAVA语言基础,变量
  4. openlayers5之热力图heatmap
  5. python中属于无序序列的有_Python字典和集合属于无序序列。(2.0分)_学小易找答案...
  6. 【转】Laplace 算子
  7. node稳定版本_Node.js十年,你大爷还是你大爷
  8. 文件编辑vim常用命令
  9. [nssl 1322][jzoj cz 2109] 清兵线 {dp}
  10. 惠普台式机开不了机怎么办 惠普台式电脑无法开机的解决方法
  11. selinux基本概念 | 开启selinux策略 | 安全上下文的临时修改 | 安全上下文的永久修改 | 如何修复selinux | selinux对服务功能的影响 | 系统自动排错
  12. ArcGis Server10.2 授权文件教程
  13. 安卓手机免root权限恢复微信聊天记录(以vivo手机为例)
  14. PE系统下安装windows server 2003
  15. php打印出来乱码_PHP输出中文乱码怎么解决?
  16. STM8L051 同时使用RTC和USART通信
  17. Android 6.0 PM机制系列(四) APK安装需要空间分析
  18. web开发学习过程,一个合格的初级前端工程师需要掌握的模块笔记
  19. JavaScript实现字符串翻转
  20. 解决:adobe界面模糊且pdf文件模糊

热门文章

  1. C++数组求和用自带的库超级方便
  2. 纹章之谜一人攻略——英雄战争篇
  3. 什么是传输层协议TCP/UDP???
  4. oracle在分组内排序的方法,oracle 在分组内排序的方法(转载)
  5. Unity Shader入门精要第七章 基础纹理 凹凸映射之在世界空间下计算
  6. PHP 7.4 新特性
  7. elasticsearch 乐观锁
  8. PostgreSQL参数学习:wal_keep_segments
  9. dalvik和鸿蒙,ART与Dalvik哪个好用 ART模式和Dalvik模式区别对比分析
  10. BUUCTF pwn