zookeeper

安装:

直接解压即可

1.在zookeeper的解压目录中进行如下操作

#此时拷贝文件
cd conf/
# 因为zookeeper默认使用的配置文件名称是zoo.cfg,否则启动不了
# 如果非root用户需要修改dataDir这个配置
cp zoo_sample.cfg zoo.cfg
cd bin/
# 查看可用命令
sh zkServer.sh xxx
#{start|start-foreground|stop|restart|status|print-cmd}
#启动zookeeper服务
sh zkServer.sh start
#客户端连接
sh ./zkCli.sh -server 127.0.0.1:2181

数据模型

zookeeper的数据模型和文件系统类似:每一个节点称为:znode. 是zookeeper中的最小数据单元。每一个 znode上都可以保存数据和挂载子节点。 从而构成一个层次化的属性结构

节点特性

持久化节点 : 节点创建后会一直存在zookeeper服务器上,直到主动删除

持久化有序节点 :每个节点都会为它的一级子节点维护一个顺序

临时节点 : 临时节点的生命周期和客户端的会话保持一致。当客户端会话失效,该节点自动清理

临时有序节点 : 在临时节点上多一个顺序性特性

zoo.cfg配置文件分析

tickTime=2000 zookeeper中最小的时间单位长度 (ms)

initLimit=10 follower节点启动后与leader节点完成数据同步的时间

syncLimit=5 leader节点和follower节点进行心跳检测的最大延时时间

dataDir=/tmp/zookeeper 表示zookeeper服务器存储快照文件的目录

dataLogDir 表示配置 zookeeper事务日志的存储路径,默认指定在dataDir目录

clientPort 表示客户端和服务端建立连接的端口号: 2181

命令集合

    stat path [watch]set path data [version]ls path [watch]delquota [-n|-b] pathls2 path [watch]setAcl path aclsetquota -n|-b val pathhistory redo cmdnoprintwatches on|offdelete path [version]sync pathlistquota pathrmr pathget path [watch]create [-s] [-e] path data acladdauth scheme authquit getAcl pathclose connect host:port

1.创建节点:

create [-s] [-e] path data acl
# -e表示创建临时节点,会自动删除
# -s表示创建顺序节点
# data 节点数据
# acl 权限
**************************************实例 1*************************************
#设置node节点的数据值为123
create /nodes 123
#获取node节点内容:get path [watch] 设置一个watcher,触发器
get /nodes
    123
    cZxid = 0x6
    ctime = Sat Apr 28 09:23:46 CST 2018
    mZxid = 0x7
    mtime = Sat Apr 28 09:24:03 CST 2018
    pZxid = 0x6
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 0
get /nodes watch #当数据修改或者删除的时候会触发此watcher,仅仅生效一次
#修改节点:set path data [-v version]
set /nodes 2222
#删除节点
delete path [-v version]

2.state信息的含义描述

cZxid = 0x6
ctime = Sat Apr 28 09:23:46 CST 2018
mZxid = 0x8
mtime = Sat Apr 28 09:29:24 CST 2018
pZxid = 0x6
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0

cversion = 0 子节点的版本号
aclVersion = 0 表示acl的版本号,修改节点权限
dataVersion = 1 表示的是当前节点数据的版本号,锁有关

czxid 节点被创建时的事务ID
mzxid 节点最后一次被更新的事务ID
pzxid 当前节点下的子节点最后一次被修改时的事务ID

ctime = Sat Aug 05 20:48:26 CST 2017
mtime = Sat Aug 05 20:48:50 CST 2017

ephemeralOwner = 0x0 创建临时节点的时候,会有一个sessionId 。 该值存储的就是这个sessionid

3.什么是watcher

zookeeper提供了分布式数据发布/订阅**,zookeeper允许客户端向服务器注册一个watcher监听。当服务器端的节点触发指定事件的时候会触发watcher。服务端会向客户端发送一个事件通知watcher的通知是一次性,一旦触发一次通知后,该watcher就失效

#父节点
get /node watch
#子节点 ,通过ls给父节点设置watch,在子节点创建,删除的时候会触发,但是修改不会触发
ls /parent watch
#修改触发watch
get /parent/child watch

acl

zookeeper提供控制节点访问权限的功能,用于有效的保证zookeeper中数据的安全性。避免误操作而导致系统出现重大事故。

权限相关的命令:

getAcl /node #获取某一个节点的权限
setAcl /node #设置权限
#输入认证授权信息,即注册用户
addauth
#添加用户名和密码,
addauth digest user:user

zk中acl的组成:

[scheme : id : permissions]

1.scheme :代表采用某种机制字符串

​ world->只有一个id,anyone,展现形式:world:anyone:[permissons]

​ auth 代表认证登录需要注册用户才能登录的,展现形式:auth:username:pass:[permissions]

​ digest 对密码加密 digest:username:BASE64(SHA1(password)):[permissons]

​ ip:ip机制的字符串,表示只有某个ip才能进行访问 ip:192.168.135.1:[permissions]

​ super:超级管理员,拥有所权限,需要修改文档进行配置

2.id :表示允许访问的用户

3.permission:权限组合字符串

getAcl /nodes'world,'anyone: cdrwa

CREATE /READ/WRITE/DELETE/ADMIN

4.设置权限

# world机制权限设置
setAcl /nodes world:anynone:crw
# auth 和auth#1.注册用户,采用digest模式,使得密码加密,addauth digest admin:admin#设置权限setAcl /mic auth:admin:admin:crdgetAcl /mic'digest,'user:/LuTKDRdjX+6xkBvju9H7KC44Ao=: rcd

super权限,修改配置文件

vi zkServer.sh
#添加如下的内容,password需要加密
nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}"  "-Dzookeeper.DigestAuthenticationProvider.superDigest=user:password"
-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &

5.zk的四字命令

yum install nc -y

设置四字命令白名单:

vi conf/zoo.cfg

4lw.commands.whitelist=*

集群搭建

注意点:需要开发响应的端口,或者你可以直接挂你防火墙

>>> 开启端口
firewall-cmd --zone=public --add-port=80/tcp --permanent命令含义:
--zone #作用域
--add-port=80/tcp #添加端口,格式为:端口/通讯协议
--permanent #永久生效,没有此参数重启后失效
>>> 重启防火墙
firewall-cmd --reload

1.vi conf/zoo.cfg

添加如下内容:

server.id=host:port1:port2

​ port1:各个zookeeper的端口号,可以相同,也可以不相同

​ port2:各个zookeeper之间进行通信的端口号,

2.vi /tmp/zookeeper/myid 编辑此文件,增加server.id中的id

3.示例

在机器:192.168.135.140上面

vim conf/zoo.cfg
#在页尾添加server.1=192.168.135.140:2188:3188server.2=192.168.135.141:2188:3188server.3=192.168.135.142:2188:3188vim /tmp/zookeeper/myid
#添加一个11

在机器:192.168.135.141上面

vim conf/zoo.cfg
#在页尾添加server.1=192.168.135.140:2188:3188server.2=192.168.135.141:2188:3188server.3=192.168.135.142:2188:3188vim /tmp/zookeeper/myid
#添加一个22

在机器:192.168.135.142上面,进行相同的操作,

查看节点状态:./zkServer.sh status

伪集群的搭建

cp zookeeper zookeeper02 -rf
cp zookeeper zookeeper03 -rf

然后配置和集群相似,但是因为在同一个机器上面,所以端口号不能一致

java 原生api

导入jar

<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.11</version>
</dependency>
1.创建连接
package com.itcloud.connect;import java.io.IOException;
import java.util.concurrent.TimeUnit;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;public class ZkConnection implements Watcher {// 集群连接地址private static final String CONNECT_ADDRESS = "192.168.135.140:2180,192.168.135.140:2182,192.168.135.143:2183";private static final int TIMEOUT = 5000;public static void main(String[] args) throws Exception {//创建连接ZooKeeper zk = new ZooKeeper(CONNECT_ADDRESS, TIMEOUT, new ZkConnection());System.out.println("开始连接");System.out.println("连接状态..." + zk.getState());TimeUnit.SECONDS.sleep(2);          //此处睡眠两秒目的是等待连接完成System.out.println("连接状态2..." + zk.getState());}@Overridepublic void process(WatchedEvent event) {System.out.println("时间的监听...." + event.getState());}}
2.创建节点
public class ZkCreateNode implements Watcher {private static ZooKeeper zk = null;private static final String CONNECT_ADDRESS = "192.168.135.140:2180,192.168.135.140:2182,192.168.135.143:2183";private static final int TIMEOUT = 5000;public ZkCreateNode() {}public ZkCreateNode(String connectString) {try {zk = new ZooKeeper(connectString, TIMEOUT, new ZkCreateNode());} catch (IOException e) {e.printStackTrace();if (zk != null) {try {zk.close();} catch (InterruptedException e1) {e1.printStackTrace();}}}}public void createNode(String path, byte[] data, List<ACL> acls) {String result = "";try {// CreateMode.PERSISTENT 表示创建永久节点// result = zk.create(path, data, acls, CreateMode.PERSISTENT);String ctx = "{create:success}";zk.create(path, data, acls, CreateMode.PERSISTENT, (int rc, String paths, Object ctxs, String name) -> {System.out.println("*********************");System.out.println("节点名称" + path);System.out.println((String) ctx);System.out.println(rc);System.out.println(name);System.out.println("*********************");}, ctx);System.out.println("创建节点 \t" + result);TimeUnit.SECONDS.sleep(2);} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {ZkCreateNode zcn = new ZkCreateNode(CONNECT_ADDRESS);zcn.createNode("/java2", "javaapi2".getBytes(), Ids.OPEN_ACL_UNSAFE);}@Overridepublic void process(WatchedEvent event) {}
}
3.修改节点:
        ZkCreateNode zcn = new ZkCreateNode(CONNECT_ADDRESS);       try {Stat stat = zcn.getZk().setData("/java2", "changejava2".getBytes(), 0);System.out.println(stat.getVersion());} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}
6.删除节点:
/**************************** 节点删除 **************************************/// 同步删除// try {// zcn.getZk().delete("/java", 0);// } catch (InterruptedException | KeeperException e) {// // TODO Auto-generated catch block// e.printStackTrace();// }// 同步删除,没有返回值,用户体验不是很好,异步删除,可以显示删除之后的信息zcn.getZk().delete("/nodes", 0, (int rc, String path, Object ctx) -> {System.out.println(rc);System.out.println(path);System.out.println((String) ctx);}, "hello");try {//异步要等待删除TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
7.java.util.concurrent.CountDownLatch的使用
7.1数据查找的时候使用
public class ZkGetData implements Watcher {private static CountDownLatch count = new CountDownLatch(1);private static CountDownLatch countDown = new CountDownLatch(1);private ZooKeeper zookeeper = null;private static Stat stat = new Stat();private static final String CONNECT_ADDRESS = "192.168.135.140:2180,192.168.135.140:2182,192.168.135.143:2183";private static final int TIMEOUT = 5000;public ZkGetData() {}public ZkGetData(String connectString) {try {zookeeper = new ZooKeeper(connectString, TIMEOUT, new ZkGetData());try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}} catch (IOException e) {e.printStackTrace();if (zookeeper != null) {try {zookeeper.close();} catch (InterruptedException e1) {e1.printStackTrace();}}}}public void getData() {try {/*** param1 node name param2 if true,listen zookeeper's changes param3 currentNode* stat*/zookeeper.getData("/java2", true, stat);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}try {count.await();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {ZkGetData zkGetData = new ZkGetData(CONNECT_ADDRESS);try {byte[] data = zkGetData.getZookeeper().getData("/java2", true, stat);System.out.println("在次数获取内容?" + new String(data));// 如果count不减到0的话,就会一直等待count.await();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void process(WatchedEvent event) {if (event.getType() == EventType.NodeDataChanged) { //节点内容改变的时候触发此事件
//          this.zookeeper = new ZkGetData(CONNECT_ADDRESS);try {byte[] data = this.zookeeper.getData("/java2", false, stat);System.out.println("修改之后的值" + new String(data) + "版本号:" + stat.getVersion());} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}count.countDown();}}public ZooKeeper getZookeeper() {return zookeeper;}public void setZookeeper(ZooKeeper zookeeper) {this.zookeeper = zookeeper;}
}
获取子节点
//同步获取子节点
List<String> childrenNodes = zkGetData.getZookeeper().getChildren("/java2", true);
// stat可以获取到当前父节点的状态
zkGetData.getZookeeper().getChildren("/java2", true, stat);
//异步获取子节点
zkGetData.getZookeeper().getChildren("/java2", true, (int rc, String path, Object ctx, List<String> children, Stat stat) -> {
//TODO
}, "成功回调");
判断节点是否存在
zkGetData.getZookeeper().exists(path, watch);
权限认证

​ 在设置的时候可以通Ids设置简单的全新

zcn.createNode("/java2", "javaapi2".getBytes(), Ids.OPEN_ACL_UNSAFE);

​ 自定义权限

ZkCreateNode zcn = new ZkCreateNode(CONNECT_ADDRESS);
/**************************** 创建节点 **************************************/
List<ACL> acls = new ArrayList<>();
//创建用户
Id user1 = new Id("digest", DigestAuthenticationProvider.generateDigest("user1:user1"));
Id user2 = new Id("digest", DigestAuthenticationProvider.generateDigest("user2:user2"));
acls.add(new ACL(Perms.ALL, user1)); //user1赋予所有权限
acls.add(new ACL(Perms.READ | Perms.DELETE, user2)) ;
zcn.createNode("/java2acl", "acl".getBytes(), acls);
//用户注册登录,对有权限的节点进行操作
zcn.getZk().addAuthInfo("digest", "user2:user2".getBytes());
//TODO 对节点进行相关操作

curator客户端的使用

创建会话连接

public class ZkCuratorConnect {private CuratorFramework client = null;private static final String ADDRESS = "192.168.135.140:2180,192.168.135.140:2182,192.168.135.140:2183";public ZkCuratorConnect() {RetryPolicy retryPolicy = new RetryNTimes(3, 5000);client = CuratorFrameworkFactory.builder().connectString(ADDRESS)         .sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("workspace").build();client.start();}public void close(){if(this.client !=null) {this.client.close();}}public CuratorFramework getClient() {return client;}public static void main(String[] args) {ZkCuratorConnect connect = new ZkCuratorConnect();boolean flag = connect.getClient().isStarted();System.out.println(flag); //trueconnect.getClient().close();}}

操作节点

1.节点的创建

        ZkCuratorConnect connect = new ZkCuratorConnect();/*** 创建节点*/String path = "/super/child"; //这里curator客户端自动帮助递归connect.getClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(path, "curator".getBytes());TimeUnit.SECONDS.sleep(2);connect.getClient().close();

2.节点查询

Stat stat = new Stat();
byte[] data = connect.getClient().getData().storingStatIn(stat).forPath("/super/child");
System.out.println("获取的数据" + new String(data) + "节点状态:" + stat);

3.修改节点数据

Stat stat = connect.getClient().setData().withVersion(0).forPath("/super", "HELLO WORLD".getBytes());
System.out.println("修改后的stat" + stat);

4.删除节点

connect.getClient().delete().guaranteed() // 如果失败,后台还会一直删除,直到成功为止.deletingChildrenIfNeeded() // 如果有子节点,那么子节点也一并删除.withVersion(1).forPath("/super");

5.获取子节点

List<String> children = connect.getClient().getChildren().forPath("/super");if (children.size() != 0) {for(String child : children) {System.out.print(child + "\t");}}

6.判断节点是否存在

/**
* 根据Stat实例是否为空判断,是否存在该节点
*/
Stat stat = connect.getClient().checkExists().forPath("/superss");
System.out.println();

7.watch监听

​ 监听一次就失效

connect.getClient().getData().usingWatcher(new CuratorWatcher() {@Overridepublic void process(WatchedEvent event) throws Exception {if (event.getType() == EventType.NodeDataChanged) {System.out.println("节点改变后的值" + new String(connect.getClient().getData().forPath("/super")) );}}
}).forPath("/super");

​ 永久监听

        /*** 节点监听,永久有效,   监听节点的删除,更新*/final NodeCache node = new NodeCache(connect.getClient(), "/super"); // 创建要缓存的节点node.start(true); // 初始化,获取node的值,并且缓存if (node.getCurrentData() != null) {System.out.println("当前节点的值" + new String(node.getCurrentData().getData()));} else {System.out.println("节点不存在");return;}node.getListenable().addListener(() -> {if (node.getCurrentData() == null) {System.out.println("节点已经被删除");return;}System.out.println("节点路径" + node.getCurrentData().getPath() + "节点数据" + new String(node.getCurrentData().getData()));});

8.监听子节点

String childrenNodeParent = "/super";final PathChildrenCache cache = new PathChildrenCache(connect.getClient(), childrenNodeParent, true);/*** StartMode: 初始化方式*  POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件 *  NORMAL:异步初始化* BUILD_INITIAL_CACHE:同步初始化*/cache.start(StartMode.NORMAL);List<ChildData> childrenNode = cache.getCurrentData();if (childrenNode.size() == 0) {System.out.println("子节点不存在");return;}cache.getListenable().addListener((CuratorFramework client, PathChildrenCacheEvent event) -> {if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){System.out.println("子节点初始化ok...");}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){String path = event.getData().getPath();if (path.equals("/super/child/d")) {System.out.println("添加子节点:" + event.getData().getPath());System.out.println("子节点数据:" + new String(event.getData().getData()));} else if (path.equals("/super/imooc/e")) {System.out.println("添加不正确...");}}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){System.out.println("删除子节点:" + event.getData().getPath());}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){System.out.println("修改子节点路径:" + event.getData().getPath());System.out.println("修改子节点数据:" + new String(event.getData().getData()));}});

完整代码

package com.itcloud.zkcurator.connect;import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;public class ZkCuratorConnect {private CuratorFramework client = null;private static final String ADDRESS = "192.168.135.140:2180,192.168.135.140:2182,192.168.135.140:2183";public ZkCuratorConnect() {RetryPolicy retryPolicy = new RetryNTimes(3, 5000);client = CuratorFrameworkFactory.builder().connectString(ADDRESS).sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("workspace").build();client.start();}public void close() {if (this.client != null) {this.client.close();}}public CuratorFramework getClient() {return client;}public static void main(String[] args) throws Exception {ZkCuratorConnect connect = new ZkCuratorConnect();/*** 创建节点*//** String path = "/super/child"; // 这里curator客户端自动帮助递归* connect.getClient().create().creatingParentsIfNeeded().withMode(CreateMode.* PERSISTENT) .withACL(Ids.OPEN_ACL_UNSAFE).forPath(path,* "curator".getBytes());*//*** 节点查询*//** Stat stat = new Stat(); byte[] data =* connect.getClient().getData().storingStatIn(stat).forPath("/super/child");* System.out.println("获取的数据" + new String(data) + "节点状态:" + stat);*//*** 修改节点数据*//** Stat stat = connect.getClient().setData().withVersion(0).forPath("/super",* "HELLO WORLD".getBytes());* * System.out.println("修改后的stat" + stat);*//*** 删除节点*//** connect.getClient().delete().guaranteed() // 如果失败,后台还会一直删除,直到成功为止* .deletingChildrenIfNeeded() // 如果有子节点,那么子节点也一并删除* .withVersion(1).forPath("/super");*//*** 递归查询子节点*//** List<String> children = connect.getClient().getChildren().forPath("/super");* if (children.size() != 0) { for(String child : children) {* System.out.print(child + "\t"); } }*//*** 根据Stat实例是否为空判断,是否存在该节点*//** Stat stat = connect.getClient().checkExists().forPath("/superss");* System.out.println();*//*** 节点监听,一次失效*//** connect.getClient().getData().usingWatcher(new CuratorWatcher() {* * @Override public void process(WatchedEvent event) throws Exception { if* (event.getType() == EventType.NodeDataChanged) { System.out.println("节点改变后的值"* + new String(connect.getClient().getData().forPath("/super")) ); } }* }).forPath("/super");*//*** 节点监听,永久有效, 监听节点的删除,更新*//** final NodeCache node = new NodeCache(connect.getClient(), "/super"); //* 创建要缓存的节点 node.start(true); // 初始化,获取node的值,并且缓存* * if (node.getCurrentData() != null) { System.out.println("当前节点的值" + new* String(node.getCurrentData().getData()));* * } else { System.out.println("节点不存在"); return; }* node.getListenable().addListener(() -> { if (node.getCurrentData() == null) {* System.out.println("节点已经被删除"); return; } System.out.println( "节点路径" +* node.getCurrentData().getPath() + "节点数据" + new* String(node.getCurrentData().getData())); });*/String childrenNodeParent = "/super";final PathChildrenCache cache = new PathChildrenCache(connect.getClient(), childrenNodeParent, true);/*** StartMode: 初始化方式*  POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件 *  NORMAL:异步初始化* BUILD_INITIAL_CACHE:同步初始化*/cache.start(StartMode.NORMAL);List<ChildData> childrenNode = cache.getCurrentData();if (childrenNode.size() == 0) {System.out.println("子节点不存在");return;}cache.getListenable().addListener((CuratorFramework client, PathChildrenCacheEvent event) -> {if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){System.out.println("子节点初始化ok...");}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){String path = event.getData().getPath();if (path.equals("/super/child/d")) {System.out.println("添加子节点:" + event.getData().getPath());System.out.println("子节点数据:" + new String(event.getData().getData()));} else if (path.equals("/super/imooc/e")) {System.out.println("添加不正确...");}}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){System.out.println("删除子节点:" + event.getData().getPath());}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){System.out.println("修改子节点路径:" + event.getData().getPath());System.out.println("修改子节点数据:" + new String(event.getData().getData()));}});TimeUnit.SECONDS.sleep(200);connect.getClient().close();}}

权限管理

public class ZkCuratorACL {private CuratorFramework client = null;private static final String ADDRESS = "192.168.135.140:2180,192.168.135.140:2182,192.168.135.140:2183";public ZkCuratorACL() {RetryPolicy retryPolicy = new RetryNTimes(3, 5000);//在客户端创建的时候要进行认证client = CuratorFrameworkFactory.builder().connectString(ADDRESS).sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("workspace").authorization("digest", "user1:admin".getBytes()).build();client.start();}public void close() {if (this.client != null) {this.client.close();}}public CuratorFramework getClient() {return client;}public static void main(String[] args) throws Exception {ZkCuratorACL curatorACL = new ZkCuratorACL();List<ACL> acls = new ArrayList<>();Id user1 = new Id("digest", DigestAuthenticationProvider.generateDigest("user1:admin"));Id user2 = new Id("digest", DigestAuthenticationProvider.generateDigest("user2:admin"));ACL acl1 = new ACL(Perms.ALL, user1);ACL acl2 = new ACL(Perms.CREATE|Perms.DELETE, user2);acls.add(acl1);acls.add(acl2);//      curatorACL.getClient().create().creatingParentsIfNeeded()
//          .withMode(CreateMode.PERSISTENT)
//          .withACL(acls, true) //true 表示父节点也会设置相同的权限
//          .forPath("/acl_test/aa", "hello acl".getBytes());curatorACL.getClient().setACL().withACL(acls).forPath("/acl_test/bb");}
}

继续。。。。。。

zookeeper的使用相关推荐

  1. 常用的高性能 KV 存储 Redis、Memcached、etcd、Zookeeper 区别

    1. 什么是 KV 存储 KV 是 Key-Value 的缩写,KV 存储也叫键值对存储.简单来说,它是利用 Key 做索引来实现数据的存储.修改.查询和删除功能. 常用的高性能 KV 存储主要有 R ...

  2. kafka+zookeeper搭建步骤kafka问题

    kafka+zookeeper搭建步骤 帅气的名称被占用关注 0.1392018.12.04 13:48:00字数 1,007阅读 88 vmware 安装centOS7 克隆虚拟为:三台 本地你的I ...

  3. ZooKeeper简单使用

    ZooKeeper简单使用 ZooKeeper简单使用 1.ZooKeeper简介 2.ZooKeeper能做什么 3.ZooKeeper核心 3.1.ZooKeeper安装 3.2.ZooKeepe ...

  4. 2021年大数据ZooKeeper(六):ZooKeeper选举机制

    目录 ​​​​​​ZooKeeper选举机制 概念 全新集群选举 非全新集群选举 ZooKeeper选举机制 zookeeper默认的算法是FastLeaderElection,采用投票数大于半数则胜 ...

  5. 2021年大数据ZooKeeper(五):ZooKeeper Java API操作

    目录 ZooKeeper Java API操作 引入maven坐标 节点的操作 ZooKeeper Java API操作 这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端 ...

  6. 2021年大数据ZooKeeper(四):ZooKeeper的shell操作

    目录 ZooKeeper的shell操作 客户端连接 shell基本操作 操作命令 操作实例 节点属性 ​​​​​​​ZooKeeper Watcher(监听机制) ​​​​​​​Watch机制特点 ...

  7. 2021年大数据ZooKeeper(三):Zookeeper数据模型和节点类型

    目录 Apache ZooKeeper Zookeeper数据模型 Zookeeper节点类型 Apache ZooKeeper Zookeeper数据模型 图中的每个节点称为一个Znode. 每个Z ...

  8. 2021年大数据ZooKeeper(二):ZooKeeper集群搭建

    目录 ZooKeeper集群搭建 第一步:下载zookeeeper的压缩包,下载网址如下 第二步:解压 第三步:修改配置文件 第四步:添加myid配置 ​​​​​​​第五步:安装包分发并修改myid的 ...

  9. 2021年大数据ZooKeeper(一):ZooKeeper基本知识

    目录 Zookeeper基本知识 ZooKeeper概述 ZooKeeper特性 ZooKeeper集群角色 Leader: Follower: Observer: Zookeeper基本知识 Zoo ...

  10. ZooKeeper简介和概念知识

    1. 简介 ZooKeeper是一种分布式协调服务,用于管理大型主机.在分布式环境中协调和管理服务是一个复杂的过程. ZooKeeper通过其简单的架构和API解决了这个问题.ZooKeeper允许开 ...

最新文章

  1. pytorch .item_pytorch + SGD
  2. rabbitmq用户权限管理
  3. c++学习笔记之构造函数
  4. 2019年, VQA论文汇总
  5. 2017/06/23 linux软件管理构建本地源
  6. Mac OS安装octave出现的问题-'error:terminal type set to 'unknown'的解决'
  7. 2000年考研英语阅读理解文章五
  8. python自编信息加密函数_自定义Python加密算法
  9. 【高级】分表和分区的区别、分库分表介绍与区别
  10. python内置函数map/reduce/filter
  11. Activiti7整合SpringBoot
  12. OMNeT++下载、安装及实例tictoc1-tictoc18
  13. python mysqldb_python mysqldb 教程
  14. HLA高层体系结构+RTI(2)
  15. 微信公众号手机端无法打开,但是微信官方调试工具和电脑端可以打开
  16. Reliability, Availability, Serviceability (RAS) 介绍
  17. r语言C指数的置信区间,用R语言求置信区间
  18. [官方Flink入门笔记 ] 三、开发环境搭建和应用的配置、部署及运行
  19. JavaScript mongodb(数据库)简单值
  20. 东南大学银行账号信息

热门文章

  1. Flash小玩意图案创作:demo(一)
  2. pta 念数字(C语言实现)
  3. Word控件Spire.Doc 转换教程(十五):在 C# 中将 HTML 文件转换为 PDF 和 XPS
  4. 新闻短视频制作,你了解多少?
  5. 计算机毕业设计 SSM在线药品购物商城系统(源码+论文)
  6. twitter最近很火,来一个twitter与个互联网媒体的比较转
  7. 推荐一个Google Analytics的开源平替
  8. 如何在与 WPF 文本框的触摸交互中显示触摸键盘
  9. 自然语言处理三大特征抽取器(CNN/RNN/TF)比较
  10. 创意前沿:30个别具创意的科技广告欣赏