1、zkCli客户端操作

1.1 连接服务器

通过 sh zkCli.sh 命令连接 zookeeper 客户端服务器。
当出现 WatchedEvent state:SyncConnected type:None path:null 表示成功连接上了zookeeper服务器

[root@k8smaster bin]# pwd
/opt/zookeeper-3.5.5/bin
[root@k8smaster bin]# sh zkCli.sh
...
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] 

上面默认是连接本地的2181端口的zookeeper服务器,如果希望连接指定的 zookeeper 服务器,可以通过如下方式实现:
sh zkCli.sh -server ip:port

1.2 创建节点 create

create [-s] [-e] path [data] [acl]

其中,-s表示顺序节点,-e表示临时节点,acl访问控制列表。默认情况下,即不添加-s 或- e 参数的,创建的是持久节点。

[zk: localhost:2181(CONNECTED) 0] create /firstNode 123
Created /firstNode
[zk: localhost:2181(CONNECTED) 1]

执行完上面的命令,就在 zookeeper 的根节点下创建了一个叫作 /firstNode 的节点,并且节点的数据内容是“123”。另外,create 命令的最后一个参数是acl ,它是用来进行权限控制的,缺省情况下,不做任何权限控制。

1.3 读取子节点 ls

使用 ls 命令,可以列出ZooKeeper指定节点下的所有子节点。当然,这个命令只能看到指定节点下第一级的所有子节点。
ls [-s] [-w] [-R] path
-s表示获取节点信息,包括时间戳、版本号、数据大小等信息。
-w 设置 Watch监听器
-R 递归显示
path 表示的是指定数据节点的节点路径。
执行如下命令:

[zk: localhost:2181(CONNECTED) 1] ls /
[firstNode, zookeeper]
[zk: localhost:2181(CONNECTED) 2]

第一次部署的 ZooKeeper 集群,默认在根节点“/”下面有一个叫作 /zookeeper 的保留节点。

1.4 读取数据内容 get

get [-s] [-w] path

-s表示获取节点信息,包括时间戳、版本号、数据大小等信息。
-w 设置 Watch监听器

读取 /firstNode 的数据内容,-s表示获取节点信息,可以看到数据版本 dataVersion 为0。

[zk: localhost:2181(CONNECTED) 3] get /firstNode
123
[zk: localhost:2181(CONNECTED) 4] get -s /firstNode
123
cZxid = 0x60000000c
ctime = Mon Dec 21 15:11:06 CST 2020
mZxid = 0x60000000c
mtime = Mon Dec 21 15:11:06 CST 2020
pZxid = 0x60000000c
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
[zk: localhost:2181(CONNECTED) 5] 

1.5 更新 set

set [-s] [-v version] path data
-s表示节点为顺序节点,-v 指定版本号,data 就是要更新的新内容。
在 zookeeper 中 ,节点的数据是有版本概念的,这个参数用于指定本次更新操作是基于 ZNode 的哪一个数据版本进行的。

设置 /firstNode 节点数据 456,再次通过 get -s 查看节点信息,看到数据内容由 123 变成了 456,数据版本 dataVersion 为1。

[zk: localhost:2181(CONNECTED) 5] set /firstNode 456
[zk: localhost:2181(CONNECTED) 6] get -s /firstNode
456
cZxid = 0x60000000c
ctime = Mon Dec 21 15:11:06 CST 2020
mZxid = 0x60000000f
mtime = Mon Dec 21 15:20:08 CST 2020
pZxid = 0x60000000c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
[zk: localhost:2181(CONNECTED) 7]

1.6 获取节点的访问控制列表 getAcl

getAcl [-s] [-w] path

-s表示获取节点信息,包括时间戳、版本号、数据大小等信息。
-w 设置 Watch监听器

[zk: localhost:2181(CONNECTED) 11] getAcl /firstNode
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 12] getAcl -s /firstNode
'world,'anyone
: cdrwa
cZxid = 0x60000000c
ctime = Mon Dec 21 15:11:06 CST 2020
mZxid = 0x60000000f
mtime = Mon Dec 21 15:20:08 CST 2020
pZxid = 0x60000000c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
[zk: localhost:2181(CONNECTED) 13] 

上面 : cdrwa 表示权限控制列表

  • c(CREATE) : 创建子节点的权限。
  • r(READ) : 获取节点数据和子节点列表的权限。
  • w(WRITE) : 更新节点数据的权限。
  • d(DELETE) : 删除子节点的权限。
  • a(ADMIN) : 设置节点 ACL 的权限。

1.7 设置节点的访问控制列表 setAcl

setAcl [-s] [-v version] [-R] path acl

-s表述顺序节点,-v指定版本号,-R递归设置,acl权限列表。

1.8 删除节点 delete

delete [-v version] path

删除某一节点,只能删除无子节点的节点。-v 表示节点版本号

[zk: localhost:2181(CONNECTED) 40] create /firstNode1/nodeB cde
Created /firstNode1/nodeB
[zk: localhost:2181(CONNECTED) 41] create /firstNode1/nodeC fgh
Created /firstNode1/nodeC
[zk: localhost:2181(CONNECTED) 42] ls -R /firstNode1
/firstNode1
/firstNode1/nodeA
/firstNode1/nodeB
/firstNode1/nodeC
[zk: localhost:2181(CONNECTED) 43] 

删除 /firstNode1/nodeC

[zk: localhost:2181(CONNECTED) 43] delete /firstNode1/nodeC
[zk: localhost:2181(CONNECTED) 44] ls -R /firstNode1
/firstNode1
/firstNode1/nodeA
/firstNode1/nodeB
[zk: localhost:2181(CONNECTED) 45]

1.9 删除所有 deleteall

deleteall path
递归的删除某一节点及其子节点

删除 /firstNode1 及其子节点

[zk: localhost:2181(CONNECTED) 47] ls -R /firstNode1
/firstNode1
/firstNode1/nodeA
/firstNode1/nodeB
[zk: localhost:2181(CONNECTED) 48] deleteall /firstNode1
[zk: localhost:2181(CONNECTED) 49] ls -R /firstNode1
Node does not exist: /firstNode1
[zk: localhost:2181(CONNECTED) 50]

2、Java客户端API使用

引入 zookeeper Java客户端maven依赖

         <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.5</version></dependency>

2.1 创建会话

通过创建ZooKeeper实例连接 zookeeper 服务器。

ZooKeeper的4种构造器源码

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {this(connectString, sessionTimeout, watcher, false);}public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, ZKClientConfig conf) throws IOException {this(connectString, sessionTimeout, watcher, false, conf);}public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider) throws IOException {this(connectString, sessionTimeout, watcher, canBeReadOnly, aHostProvider, (ZKClientConfig)null);}public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
...}

参数说明

  • connectString:指 ZooKeeper 服务器列表,由英文状态逗号分开的 host:port 字符串组成。

  • sessionTimeout:指会话的超时时间,是一个以“毫秒”为单位的整型值。

  • watcher:ZooKeeper允许客户端在构造方法中传入一个接口 Watcher 的实现类对象来作为默认的W atcher事件通知处理器。当然,该参数可以设置为 null 以表明不需要设置默认的 Watcher 处理器。

  • canBeReadOnly:这是一个boolean类型的参数,用于标识当前会话是否支持“read-only (只读) ” 模式。默认情况下,在ZooK eeper集群中,一个机器如果和集群中过半及以上机器失去了网络连接,那么这个机器将不再处理客户端请求(包括读写请求)。但是在某些使用场景下,当 ZooKeeper 服务器发生此类故障的时候,我们还是希望 ZooKeeper 服务器能够提供读服务(当然写服务肯定无法提供)一一这就是 ZooKeeper 的 “read-only” 模式。

  • ZKClientConfig :配置zookeeper连接的一些相关属性

  • HostProvider : HostProvider类定义了一个客户端的服务器地址管理器

创建一个 zookeeper 连接

public class ZkConnect implements Watcher {//倒数门闩,计数为1private static CountDownLatch countDownLatch = new CountDownLatch(1);public static void main(String[] args) throws Exception {// 创建一个最基本的ZooKeeper对象实例ZooKeeper zookeeper = new ZooKeeper("k8smaster:2181",//zookeeper服务器5000, //会话超时时间new ZkConnect());System.out.println("当前zookeeper状态:" + zookeeper.getState());try {//阻塞,直到countdown计数等于0countDownLatch.await();} catch (InterruptedException e) {}System.out.println("zookeeper 会话建立");}public void process(WatchedEvent event) {System.out.println("接收到监听事件:" + event);if (Event.KeeperState.SyncConnected == event.getState()) {// 倒数,计数-1countDownLatch.countDown();}}
}

运行结果:

当前zookeeper状态:CONNECTING
接收到监听事件:WatchedEvent state:SyncConnected type:None path:null
zookeeper 会话建立

2.2 创建节点

客户端可以通过ZooKeeper的 API 来创建一个数据节点,有如下两个方法,分別以同步和异步方式创建节点:

    // 同步创建节点public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode);// 异步创建节点 public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)

参数说明

  • path : 需要戗建的数据节点的节点路径,例如, /zk-book/foo

  • data[] : 一个字节数组,是节点创建后的初始内容

  • acl : 节点的 ACL 策略

  • createMode : 节点类型,是一个枚举类型,通常有4 种可选的节点类型:
    • 持久(PERSISTENT)
    • 持久顺序(PERSISTENT_SEQUENTIAL)
    • 临时(EPHEMERAL)
    • 临时顺序(EPHEMERAL_SEQUENTIAL)

  • cb : 异步回调函数,需要实现StringCallback接口,当服务端节点创建完毕后,Zookeeper 客户端就会自动调用这个方法可以处理相关的业务逻辑了

  • ctx : 用于传递一个对象,可以在回调方法执行的时候使用,通常是放一个上下文(Context)信息。

注意:,无论是同步还是异步接口,ZooKeeper都不支持递归创建,即无法在父节点不存在的情况下创建一个子节点。另外,如果一个节点已经存在了,那么创建同名节点的时候,会抛出 NodeExistsException 异常。

同步创建临时节点和临时顺序节点

public class ZkCreateSync implements Watcher {//倒数门闩,计数为1private static CountDownLatch countDownLatch = new CountDownLatch(1);public static void main(String[] args) throws Exception {// 创建一个最基本的ZooKeeper对象实例ZooKeeper zookeeper = new ZooKeeper("k8smaster:2181",//zookeeper服务器5000, //会话超时时间new ZkCreateSync());// 注册WatcherSystem.out.println("当前zookeeper状态:" + zookeeper.getState());//阻塞,直到countdown计数等于0countDownLatch.await();System.out.println("zookeeper 会话建立");String path1 = zookeeper.create("/zk-test-ephemeral-",// 节点路径// 节点数据"1111".getBytes(),// 对这个节点的任何操作都不受权限控制ZooDefs.Ids.OPEN_ACL_UNSAFE,// 临时节点CreateMode.EPHEMERAL);System.out.println("Success create znode: " + path1);String path2 = zookeeper.create("/zk-test-ephemeral-",//节点路径// 节点数据"2222".getBytes(),// 对这个节点的任何操作都不受权限控制ZooDefs.Ids.OPEN_ACL_UNSAFE,// 临时有序节点CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println("Success create znode: " + path2);}public void process(WatchedEvent event) {System.out.println("接收到监听事件:" + event);if (Event.KeeperState.SyncConnected == event.getState()) {// 倒数,计数-1countDownLatch.countDown();}}
}

运行结果:

当前zookeeper状态:CONNECTING
接收到监听事件:WatchedEvent state:SyncConnected type:None path:null
zookeeper 会话建立
Success create znode: /zk-test-ephemeral-
Success create znode: /zk-test-ephemeral-0000000005

异步创建节点

public class ZkCreateASync implements Watcher {//倒数门闩,计数为1private static CountDownLatch countDownLatch = new CountDownLatch(1);public static void main(String[] args) throws Exception {// 创建一个最基本的ZooKeeper对象实例ZooKeeper zookeeper = new ZooKeeper("k8smaster:2181",//zookeeper服务器5000, //会话超时时间new ZkCreateASync());// 注册WatcherSystem.out.println("当前zookeeper状态:" + zookeeper.getState());//阻塞,直到countdown计数等于0countDownLatch.await();System.out.println("zookeeper 会话建立");zookeeper.create("/zk-test-ephemeral-", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,new IStringCallback(), "I am context.");zookeeper.create("/zk-test-ephemeral-", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,new IStringCallback(), "I am context.");zookeeper.create("/zk-test-ephemeral-", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,new IStringCallback(), "I am context.");Thread.sleep( Integer.MAX_VALUE );}public void process(WatchedEvent event) {System.out.println("接收到监听事件:" + event);if (Event.KeeperState.SyncConnected == event.getState()) {// 倒数,计数-1countDownLatch.countDown();}}
}/*** 回调*/
class IStringCallback implements AsyncCallback.StringCallback {/*** 创建结果回调** @param rc   服务端响应码 ,*             0 (Ok):接口调用成功;*             -4 (ConnectionLoss) : 客户端和服务端连接已断开;*             -110 (NodeExists) : 指定节点已存在*             112 (SessionExpired) : 会话已过期* @param path 接口调用时传入API的数据节点的节点路径参数值* @param ctx  接口调用时传入API的Ctx参数值* @param name 实际路径,如果不是顺序节点则 path和name一样*/public void processResult(int rc, String path, Object ctx, String name) {System.out.println("Create path result: [" + rc + ", " + path + ", "+ ctx + ", real path name: " + name);}
}

运行结果:

当前zookeeper状态:CONNECTING
接收到监听事件:WatchedEvent state:SyncConnected type:None path:null
zookeeper 会话建立
Create path result: [0, /zk-test-ephemeral-, I am context., real path name: /zk-test-ephemeral-
Create path result: [-110, /zk-test-ephemeral-, I am context., real path name: null
Create path result: [0, /zk-test-ephemeral-, I am context., real path name: /zk-test-ephemeral-0000000007

2.3 删除节点

客户端可以通过ZooKeeper的 API 来创建一个数据节点,有如下两个方法,分別以同步和异步方式删除节点:

//同步删除
public void delete(final String path, int version);//异步删除
public void delete(final String path, int version, VoidCallback cb, Object ctx)

参数说明

  • path : 指定数据节点的节点路径,即 API 调用的目的是删除该节点
  • version : 指定节点的数据版本,即表明本次删除操作是针对该数据版本进行的,-1 表示基于数据的最新版本进行
  • cb : 注册一个异步回调函数
  • ctx : 用于传递上下文信息的对象

如果版本号是-1,就是告诉ZooKeeper服务器,客户端需要基于数据的最新版本进行更新操作。

删除节点

public class ZkDeleteSync implements Watcher {//倒数门闩,计数为1private static CountDownLatch countDownLatch = new CountDownLatch(1);public static void main(String[] args) throws Exception {// 创建一个最基本的ZooKeeper对象实例ZooKeeper zookeeper = new ZooKeeper("k8smaster:2181",//zookeeper服务器5000, //会话超时时间new ZkDeleteSync());// 注册WatcherSystem.out.println("当前zookeeper状态:" + zookeeper.getState());//阻塞,直到countdown计数等于0countDownLatch.await();System.out.println("zookeeper 会话建立");String path = "/zk-del";path = zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("建立临时节点: " + path);// 如果版本号是-1,就是告诉ZooKeeper服务器,客户端需要基于数据的最新版本进行更新操作 zookeeper.delete(path, -1);System.out.println("删除临时节点: " + path);}public void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected == event.getState()) {if (Event.EventType.None == event.getType() && null == event.getPath()) {countDownLatch.countDown();}}}
}

运行结果:

当前zookeeper状态:CONNECTING
zookeeper 会话建立
建立临时节点: /zk-del
删除临时节点: /zk-del

2.4 读取数据: 子节点列表

客户端可以通过ZooKeeper的 API 来获取一个节点的所有子节点,有如下方法:

public List<String> getChildren(String path, boolean watch);
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx);
public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx);
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx);
public List<String> getChildren(String path, boolean watch, Stat stat);
public List<String> getChildren(final String path, Watcher watcher);
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx);
public List<String> getChildren(final String path, Watcher watcher, Stat stat);

参数说明

  • path : 指定数据节点的节点路径,即 API 调用的该节点的子节点列表
  • watcher : 注册的 Watcher,一 旦在本次子节点获取之后,子节点列表发生变更的话,那么就会向客户端发送通知。该参数允许传入null。
  • watch : 表明是否需要注册一个Watcher,如果这个参数是true, 那么ZooKeeper客户端会自动使用创建zookeeper实例时的那个默认Watcher;如果是false,表明不需要注册Watcher。
  • cb : 注册一个异步回调函数
  • ctx : 用于传递上下文信息的对象
  • stat : 指定数据节点的节点状态信息。用法是在接口中传入一个旧的 stat 变量,该 stat 变量会在方法执行过程中,被来自服务端响应的新 stat 对象替换。

同步获取子节点列表

public class ZkGetChildrenSync implements Watcher {//倒数门闩,计数为1private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zk = null;public static void main(String[] args) throws Exception {zk = new ZooKeeper("k8smaster:2181",//zookeeper服务器5000, //会话超时时间new ZkGetChildrenSync());// 注册默认的Watcher事件通知处理器System.out.println("当前zookeeper状态:" + zk.getState());//阻塞,直到countdown计数等于0countDownLatch.await();System.out.println("zookeeper 会话建立");String path = "/zk-parent";zk.create(path, "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// 创建子节点zk.create(path + "/c1", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);List<String> childrenList = zk.getChildren(path, true);// 子节点列表System.out.println("子节点列表" + childrenList);zk.create(path + "/c2", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);zk.create(path + "/c4", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);childrenList = zk.getChildren(path, true);System.out.println("子节点列表" + childrenList);Thread.sleep(Integer.MAX_VALUE);}public void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected == event.getState()) {if (Event.EventType.None == event.getType() && null == event.getPath()) {countDownLatch.countDown();}}}
}

运行结果:

当前zookeeper状态:CONNECTING
zookeeper 会话建立
子节点列表[c1]
子节点列表[c4, c1, c2]

异步获取子节点列表

public class ZkGetChildrenASync implements Watcher {//倒数门闩,计数为1private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zk = null;public static void main(String[] args) throws Exception {zk = new ZooKeeper("k8smaster:2181",//zookeeper服务器5000, //会话超时时间new ZkGetChildrenASync());// 注册Watcher//阻塞,直到countdown计数等于0countDownLatch.await();String path = "/zk-parent-async";zk.create(path, "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);zk.create(path+"/c1", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);zk.getChildren(path, true, new IChildren2Callback(), null);zk.create(path+"/c2", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);Thread.sleep( Integer.MAX_VALUE );}public void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected == event.getState()) {if (Event.EventType.None == event.getType() && null == event.getPath()) {countDownLatch.countDown();}}}
}class IChildren2Callback implements AsyncCallback.Children2Callback{public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path+ ", ctx: " + ctx + ", children list: " + children + ", stat: " + stat);}
}

运行结果:

Get Children znode result: [response code: 0, param path: /zk-parent-async, ctx: null, children list: [c1], stat: 55834574861,55834574861,1608566226633,1608566226633,0,1,0,0,0,1,55834574862

2.5 读取数据: 节点数据内容

public void getData(String path, boolean watch, DataCallback cb, Object ctx);
public byte[] getData(String path, boolean watch, Stat stat);
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx);
public byte[] getData(final String path, Watcher watcher, Stat stat)

参数说明

  • path : 指定数据节点的节点路径,即 API 调用的是该节点的数据内容
  • watcher : 注册的 Watcher,一 旦在本次子节点获取之后,子节点列表发生变更的话,那么就会向客户端发送通知。该参数允许传入null。
  • watch : 表明是否需要注册一个Watcher,如果这个参数是true, 那么ZooKeeper客户端会自动使用创建zookeeper实例时的那个默认Watcher;如果是false,表明不需要注册Watcher。
  • cb : 注册一个异步回调函数
  • ctx : 用于传递上下文信息的对象
  • stat : 指定数据节点的节点状态信息。用法是在接口中传入一个旧的 stat 变量,该 stat 变量会在方法执行过程中,被来自服务端响应的新 stat 对象替换。

同步获取节点数据

public class ZkGetDataSync implements Watcher {//倒数门闩,计数为1private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zk = null;private static Stat stat = new Stat();public static void main(String[] args) throws Exception {zk = new ZooKeeper("k8smaster:2181",//zookeeper服务器5000, //会话超时时间new ZkGetDataSync());// 注册Watcher//阻塞,直到countdown计数等于0countDownLatch.await();String path = "/zk-sync";zk.create(path, "12".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println(new String(zk.getData(path, true, stat)));Thread.sleep(Integer.MAX_VALUE);}public void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected == event.getState()) {if (Event.EventType.None == event.getType() && null == event.getPath()) {countDownLatch.countDown();} else if (event.getType() == Event.EventType.NodeDataChanged) {try {System.out.println(new String(zk.getData(event.getPath(), true, stat)));System.out.println(stat.getCzxid() + "," +stat.getMzxid() + "," +stat.getVersion());} catch (Exception e) {}}}}
}

运行结果:

12

异步读取节点数据

public class ZkGetDataASync implements Watcher {//倒数门闩,计数为1private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zk = null;public static void main(String[] args) throws Exception {zk = new ZooKeeper("k8smaster:2181",//zookeeper服务器5000, //会话超时时间new ZkGetDataASync());// 注册Watcher//阻塞,直到countdown计数等于0countDownLatch.await();String path = "/zk-async";zk.create( path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL );zk.getData( path, true, new IDataCallback(), null );Thread.sleep(Integer.MAX_VALUE);}public void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected == event.getState()) {if (Event.EventType.None == event.getType() && null == event.getPath()) {countDownLatch.countDown();}}}
}class IDataCallback implements AsyncCallback.DataCallback{public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {System.out.println(rc + ", " + path + ", " + new String(data));System.out.println(stat.getCzxid()+","+stat.getMzxid()+","+stat.getVersion());}
}

运行结果:

0, /zk-async, 123
55834574890,55834574890,0

2.6 更新数据

// 同步设置数据
public Stat setData(final String path, byte data[], int version);
// 异步设置数据
public void setData(final String path, byte data[], int version, StatCallback cb, Object ctx);

参数说明

  • path : 指定数据节点的节点路径,即 API 调用的是该节点的数据内容
  • data[] : 一个字节数组,设置的节点数据内容
  • version : 指定节点的数据版本,即表明本次删除操作是针对该数据版本进行的,-1 表示基于数据的最新版本进行
  • cb : 注册一个异步回调函数
  • ctx : 用于传递上下文信息的对象

同步更新数据

public class ZkSetDataSync implements Watcher {//倒数门闩,计数为1private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zk = null;private static Stat stat = new Stat();public static void main(String[] args) throws Exception {zk = new ZooKeeper("k8smaster:2181",//zookeeper服务器5000, //会话超时时间new ZkSetDataSync());// 注册Watcher//阻塞,直到countdown计数等于0countDownLatch.await();String path = "/zk-sync";zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("getData: " + new String(zk.getData(path, true, stat)));System.out.println(stat.getCzxid() + "," +stat.getMzxid() + "," +stat.getVersion());// 如果版本号是-1,就是告诉ZooKeeper服务器,客户端需要基于数据的最新版本进行更新操作stat = zk.setData(path, "456".getBytes(), -1);System.out.println(stat.getCzxid() + "," +stat.getMzxid() + "," +stat.getVersion());System.out.println("getData: " + new String(zk.getData(path, true, stat)));// 获取 stat 对应版本的数据stat = zk.setData(path, "789".getBytes(), stat.getVersion());System.out.println(stat.getCzxid() + "," +stat.getMzxid() + "," +stat.getVersion());System.out.println("getData: " + new String(zk.getData(path, true, stat)));try {zk.setData(path, "000".getBytes(), stat.getVersion());} catch (KeeperException e) {System.out.println("Error: " + e.code() + "," + e.getMessage());}Thread.sleep(Integer.MAX_VALUE);}@Overridepublic void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected == event.getState()) {if (Event.EventType.None == event.getType() && null == event.getPath()) {countDownLatch.countDown();}}}
}

运行结果:

getData: 123
64424509501,64424509501,0
64424509501,64424509502,1
getData: 456
64424509501,64424509503,2
getData: 789

异步设置数据

public class ZkSetDataASync implements Watcher {//倒数门闩,计数为1private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zk = null;private static Stat stat = new Stat();public static void main(String[] args) throws Exception {zk = new ZooKeeper("k8smaster:2181",//zookeeper服务器5000, //会话超时时间new ZkSetDataASync());// 注册Watcher//阻塞,直到countdown计数等于0countDownLatch.await();String path = "/zk-async";zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);zk.setData(path, "456".getBytes(), -1, new IStatCallback(), null);System.out.println("getData: " + new String(zk.getData(path, true, stat)));Thread.sleep(Integer.MAX_VALUE);}@Overridepublic void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected == event.getState()) {if (Event.EventType.None == event.getType() && null == event.getPath()) {countDownLatch.countDown();}}}
}class IStatCallback implements AsyncCallback.StatCallback {public void processResult(int rc, String path, Object ctx, Stat stat) {if (rc == 0) {System.out.println("SUCCESS");}}
}

运行结果:

SUCCESS
getData: 456

2.7 检测节点是否存在

检测节点是否存在的方法如下:

// 同步检测,如果节点不存在,则返回null
public Stat exists(String path, boolean watch);
// 异步检测节点是否存在
public void exists(String path, boolean watch, StatCallback cb, Object ctx);
// 同步检测,如果节点不存在,则返回null
public Stat exists(final String path, Watcher watcher);
// 异步检测节点是否存在
public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx);

参数说明

  • path : 指定数据节点的节点路径,即 API 调用的目的是检测该节点是否存在
  • watcher : 注册的 Watcher,用于监听三类事件:节点被创建;节点被删除;节点被更新;
  • watch : 指定是否复用ZooKeeper中默认的Watcher。
  • cb : 注册一个异步回调函数
  • ctx : 用于传递上下文信息的对象

同步监听

public class ZkExist implements Watcher {//倒数门闩,计数为1private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zk = null;public static void main(String[] args) throws Exception {zk = new ZooKeeper("k8smaster:2181",//zookeeper服务器5000, //会话超时时间new ZkExist());// 注册Watcher//阻塞,直到countdown计数等于0countDownLatch.await();String path = "/zk-sync";// 同步检测,如果节点不存在,则返回null,并对该 path 进行事件监听Stat stat = zk.exists(path, true);if (stat != null) {System.out.println(stat.getCzxid() + "," +stat.getMzxid() + "," +stat.getVersion());} else {System.out.println("节点:" + path + " 不存在");}zk.create(path, "aaa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);zk.setData(path, "123".getBytes(), -1);zk.create(path + "/c1", "bbb".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);zk.delete(path + "/c1", -1);zk.delete(path, -1);Thread.sleep(Integer.MAX_VALUE);}@Overridepublic void process(WatchedEvent event) {try {if (Event.KeeperState.SyncConnected == event.getState()) {if (Event.EventType.None == event.getType() && null == event.getPath()) {countDownLatch.countDown();} else if (Event.EventType.NodeCreated == event.getType()) {System.out.println("Node(" + event.getPath() + ")Created");zk.exists(event.getPath(), true);} else if (Event.EventType.NodeDeleted == event.getType()) {System.out.println("Node(" + event.getPath() + ")Deleted");zk.exists(event.getPath(), true);} else if (Event.EventType.NodeDataChanged == event.getType()) {System.out.println("Node(" + event.getPath() + ")DataChanged");zk.exists(event.getPath(), true);}}} catch (Exception e) {}}
}

运行结果:

节点:/zk-sync 不存在
Node(/zk-sync)Created
Node(/zk-sync)DataChanged
Node(/zk-sync)Deleted

代码:
https://github.com/wengxingxia/zk_xander.git

[慕课手记同步:zk-04-zookeeper客户端操作] https://www.imooc.com/article/313728

福利: 关注同步公众号:黑桃,回复“电子书”,领取Java技术书籍pdf(不断更新)。


欢迎关注公众号"黑桃"

zk-04-zookeeper客户端操作相关推荐

  1. Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka

    文章目录 操作步骤 Maven依赖 生产者 消费者 操作步骤 Maven依赖 核心依赖 kafka-clients <dependency><groupId>org.apach ...

  2. 【Java从0到架构师】Zookeeper 应用 - Java 客户端操作、服务器动态感知、分布式锁业务处理

    分布式基石 Zookeeper 框架全面剖析 Java 客户端操作 Java 客户端 API 服务器的动态感知 服务注册 服务发现 分布式锁业务处理 单机环境(一个虚拟机中) 分布式环境_同名节点 分 ...

  3. Zookeeper 客户端API调用示例(基本使用,增删改查znode数据,监听znode,其它案例,其它网络参考资料)

    9.1 基本使用 org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话 它提供以下几类主要方法  : 功能 描述 create 在本地目录树中创建 ...

  4. zookeeper客户端库curator分析

    zookeeper客户端库curator分析 前言 综述 zookeeper保证 理解zookeeper的顺序一致性 之前使用zookeeper客户端踩到的坑 curator 连接保证 连接状态监控以 ...

  5. Zookeeper客户端Curator使用详解

    http://www.jianshu.com/p/70151fc0ef5d Zookeeper客户端Curator使用详解 简介 Curator是Netflix公司开源的一套zookeeper客户端框 ...

  6. zookeeper专题:使用zookeeper客户端实现动态监听节点并获取数据

    文章目录 1. zookeeper原生客户端 2. Curator客户端 1. zookeeper原生客户端 zookeeper原生客户端就是zookeeper官方自带的客户端,作为代码与zk服务器交 ...

  7. 【转】ZooKeeper学习第二期--Zookeeper命令操作

    一.Zookeeper的四字命令 Zookeeper支持某些特定的四字命令字母与其的交互.他们大多数是查询命令,用来获取Zookeeper服务的当前状态及相关信息.用户在客户端可以通过telnet或n ...

  8. Zookeeper客户端Curator详解

    一.Curator 客户端使用 Curator是 Netflix公司开源的一套ZooKeeper客户端框架,和 ZkClient一样它解决了非常底层的细节开发工作,包括连接.重连.反复注册Watche ...

  9. Zookeeper 客户端之 Curator

    之前写的一个在 Linux 上安装部署 Zookeeper 的笔记,其他操作系统请自行谷歌教程吧. 本文案例工程已经同步到了 github,传送门. PS : 目前还没有看过Curator的具体源码, ...

  10. 【高级篇】详解Zookeeper客户端Curator

    一.序言 之前分享过一篇关于Curotor的基本应用[基础篇]详解Zookeeper客户端Curator,如果对Curator没有了解的可以看看,本文分享关于Curator的一些高级特性,在监听和le ...

最新文章

  1. 操作系统学习笔记 第一章:操作系统概述(王道考研)
  2. Eclipse 报java.lang.OutOfMemoryError: PermGen space
  3. 技术文:微信小程序和服务器通信-WebSocket
  4. FTP的主动模式(PORT Mode)及被动模式(Passive Mode)
  5. linux 串口 vmin vtime ,Linux串口c_cc[VTIME]和c_cc[VMIN]属性设置的作用
  6. A-古代汉语知识点整理大全
  7. 如何通过Spring Boot实施Alexa技能
  8. 最新章节 第138章 量子生物计算机,生生不息的世界
  9. java c/s网络聊天室,基于c-s网络聊天室报告.doc
  10. VSCode取消注释斜体
  11. 阿里云服务器如何简单的迁移数据?阿里云通过镜像实现在线数据复制克隆
  12. oop部分,构造方法,this的使用。
  13. Redis的五种基础数据结构和三种高级数据结构
  14. 基于PHP的餐饮公司展示网站及点餐系统设计与实现
  15. The Design of design
  16. 手机安装python jam有什么影响_Python入门用Bug修改和.jam文件需要注意的问题
  17. 快手AI 实验室Y-Lab 招聘
  18. 计算机硬盘错误怎么办,电脑维修:开机遇到Windows硬盘错误画面时,该怎么做?...
  19. 微信小程序自定义输入仿咸鱼发布
  20. 南邮 OJ 1275 登山机器人问题

热门文章

  1. 【Tcl学习笔记】第3章 变量
  2. vivado 2020.2 工程保存为tcl 脚本
  3. 利用python爬虫自动登录人人网
  4. 三星I8262D制作ROM教程
  5. Word文档的四种加密方法
  6. 记一次HTTPS证书升级导致的公众号网页白屏
  7. SAP-FI模块 处理自动生成会计凭证增强
  8. maya动画制作(5)——小男孩的各种姿态动画制作(续)(修改更新)
  9. mongodb根据_id查询数据
  10. 信使服务 (Messenger)