前言

这篇文章主要基于客户端在日常工作中使用zk的角度来写,前面南国已经就zk的理论知识做过一些讲述,对此不太熟悉的可以往前看看。Curator是Apache提供的一个zk的工具包,简化了 ZooKeeper 的操作。它增加了很多使用 ZooKeeper 开发的特性,可以处理 ZooKeeper 集群复杂的连接管理和重试机制。这里我们使用springboot 集成curator来操作zk。
假设你的服务已经正确添加了zk的相关依赖,Curator maveny依赖如下,

<!-- curator-framework -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.2.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
<!-- curator-recipes -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.2.0</version><exclusions><exclusion><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId></exclusion></exclusions>
</dependency>

正文

Curator 功能分两大类,一类是对 ZooKeeper 的一些基本命令的封装,比如增删改查,即 Framework 模块;另一类是他的高级特性,即 Recipes 模块。

创建CuratorFramework

Curator 框架通过 CuratorFrameworkFactory 可以通过工厂模式或者builder 模式创建 CuratorFramework 实例。CuratorFramework 实例都是线程安全的,你应该在你的应用中共享同一个 CuratorFramework 实例。
在springboot中相当于创建一个curator的配置类

@Slf4j
@Configuration
public class ZkCoreClient {// zk 服务端集群地址@Value("${zk.url}")private String zkUrl;// session 超时时间private int timeOut = 60000;// zkclient 重试间隔时间private int baseSleepTimeMs = 5000;//zkclient 重试次数private int retryCount = 5;/*** 使用double-check 创建client** @return*/@Beanpublic CuratorFramework init() {CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkUrl).sessionTimeoutMs(timeOut).retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, retryCount))
//                            .namespace(appName).build();// 或者使用工厂模式
//                    client = CuratorFrameworkFactory.newClient(zkUrl,new ExponentialBackoffRetry(baseSleepTimeMs,retryCount)).usingNamespace(appName);client.start();log.info("client is created at ================== {}", LocalDateTime.now());return client;}}

查看curator的基本用法

/*** @author: xiejiahao* Date: 2022/3/14 17:13* Description: zk 工具类* 使用Curator 实现zk 的基本操作-增删查改数据 和监听watch*/
@Slf4j
public class ZkUtils {@AutowiredCuratorFramework client;/*** @param path* @param value* @Description: 创建路径* @Date: 2020-08-22 15:15*/public String createNode(String path, String value) throws Exception {return createNode(path, value, true);}public String createNode(String path, String value, Boolean isEphemeral) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}String node = client.create().creatingParentsIfNeeded().withMode(isEphemeral.equals(true) ? CreateMode.EPHEMERAL_SEQUENTIAL : CreateMode.PERSISTENT_SEQUENTIAL) // 临时顺序节点/持久顺序节点.forPath(path, value.getBytes());log.info("create node : {}", node);return node;}/*** @param path* @Description: 删除节点信息* @Date: 2020-08-22 16:01*/public void deleteNode(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}client.delete().guaranteed() // 保障机制,若未删除成功,只要会话有效会在后台一直尝试删除.deletingChildrenIfNeeded() // 若当前节点包含子节点,子节点也删除.forPath(path);log.info("{} is deleted ", path);}/*** 判断znode是否存在,Stat就是对znode所有属性的一个映射,stat=null表示节点不存在** @param path* @return*/public Stat isExists(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}return client.checkExists().forPath(path);}/*** 查询子节点** @param path* @return* @throws Exception*/public List<String> getChildren(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}return client.getChildren().forPath(path);}/*** @param path* @Description: 获取节点存储的值* @Date: 2020-08-22 16:11*/public String getNodeData(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}Stat stat = new Stat();byte[] bytes = client.getData().storingStatIn(stat).forPath(path);log.info("{} data is : {}", path, new String(bytes));log.info("current stat version is {}, createTime is {}", stat.getVersion(), stat.getCtime());return new String(bytes);}/*** @param path* @param value* @Description: 设置节点 数据* @Date: 2020-08-22 16:17*/public void setNodeData(String path, String value) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}Stat stat = client.checkExists().forPath(path);if (null == stat) {log.info(String.format("{} Znode is not exists", path));throw new RuntimeException(String.format("{} Znode is not exists", path));}String nodeData = getNodeData(path);client.setData().withVersion(stat.getVersion()).forPath(path, value.getBytes());log.info("{} Znode data is set. old vaule is {}, new data is {}", path, nodeData, value);}/*** @param path* @Description: 创建 给定节点的监听事件  监听一个节点的更新和创建事件(不包括删除)* @Date: 2020-08-22 16:47*/public void addWatcherWithNodeCache(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}// dataIsCompressed if true, data in the path is compressedNodeCache nodeCache = new NodeCache(client, path, false);NodeCacheListener listener = () -> {ChildData currentData = nodeCache.getCurrentData();log.info("{} Znode data is chagnge,new data is ---  {}", currentData.getPath(), new String(currentData.getData()));};nodeCache.getListenable().addListener(listener);nodeCache.start();}/*** @param path 给定节点* @Description: 监听给定节点下的子节点的创建、删除、更新* @Date: 2020-08-22 17:14*/public void addWatcherWithChildCache(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}//cacheData if true, node contents are cached in addition to the statPathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, false);PathChildrenCacheListener listener = (client, event) -> {log.info("event path is --{} ,event type is {}", event.getData().getPath(), event.getType());};pathChildrenCache.getListenable().addListener(listener);// StartMode : NORMAL  BUILD_INITIAL_CACHE  POST_INITIALIZED_EVENT// NORMAL:异步初始化, BUILD_INITIAL_CACHE:同步初始化, POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);}/*** @param path* @Description: 监听 给定节点的创建、更新(不包括删除) 以及 该节点下的子节点的创建、删除、更新动作。* @Date: 2020-08-22 17:35*/public void addWatcherWithTreeCache(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}TreeCache treeCache = new TreeCache(client, path);TreeCacheListener listener = (client, event) -> {log.info("节点路径 --{} ,节点事件类型: {} , 节点值为: {}", Objects.nonNull(event.getData()) ? event.getData().getPath() : "无数据", event.getType());};treeCache.getListenable().addListener(listener);treeCache.start();}}

Curator Recipes的使用

Recipes 模块主要有 Elections (选举)、Locks (锁)、Barriers (关卡)、Atomic (原子量)、Caches、Queues 等

Electtions 选举

选举主要依赖于 LeaderSelector 和 LeaderLatch 两个类。前者是所有存活的客户端不间断的轮流做 Leader。后者是一旦选举出 Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。这两者在实现上是可以切换的。

Locks(分布式锁)

curator 提供了InterProcessMutex来实现zk的分布式锁,他用acquire获取锁 release释放锁。

ZooKeeper分布式锁:
优点
ZooKeeper分布式锁(如InterProcessMutex),能有效地解决分布式问题,不可重入问题,使用起来也较为简单
缺点
ZooKeeper实现的分布式锁,性能并不太高。
因为每次在创建锁和释放锁的过程中,都要动态创建、销毁暂时节点来实现锁功能,
Zk中创建和删除节点只能通过Leader(主)服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower(从)服务器上,这样频繁的网络通信,系统性能会下降。

总之,在高性能、高并发的应用场景下,不建议使用ZooKeeper的分布式锁,如果在并发量不是太高的应用场景中,还是推荐使用ZooKeeper的分布式锁。
目前分布式锁,比较成熟、主流的方案有两种:

基于Redis的分布式锁。适用于并发量很大、性能要求很高而可靠性问题可以通过其他方案去弥补的场景。
基于ZooKeeper的分布式锁。适用于高可靠,而并发量不是太高的场景
在选型时,选择适合于自己业务场景的方案即可。

简单的demo如下

@AutowiredCuratorFramework client;/*** 首先需要调用create 创建路径* 尝试获取锁 包含解锁操作* @param lockCallback* @param lockKey* @param timeout* @param <T>* @return*/public <T> TwoTuple<Boolean, T> tryLock(LockCallback<T> lockCallback, String lockKey, Long timeout) {InterProcessMutex lock = new InterProcessMutex(client, lockKey);try {if (lock.acquire(timeout, TimeUnit.MILLISECONDS)) {log.info(Thread.currentThread().getName() + " get lock");return new TwoTuple<>(true, lockCallback.exec());}} catch (Exception e) {e.printStackTrace();} finally {try {log.info(Thread.currentThread().getName() + " release lock");lock.release();} catch (Exception e) {e.printStackTrace();}}return new TwoTuple<>(false, null);}

服务注册发现

maven 添加依赖

<!-- curator 做服务发现--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>4.2.0</version></dependency>
/*** 服务注册:将当前启动的服务信息(采用map的形式)进行注册到zk中。* 这里采用的是map 可以自定义类*/public void registerService(String serviceName, String... urls) throws Exception {ServiceInstanceBuilder<Map> serviceInstanceBuilder = ServiceInstance.builder();serviceInstanceBuilder.address(InetAddress.getLocalHost().getHostAddress());serviceInstanceBuilder.port(Integer.parseInt(environment.getProperty("server.port")));serviceInstanceBuilder.name(serviceName);Map config = new HashMap();config.put("url", urls);serviceInstanceBuilder.payload(config);ServiceInstance<Map> serviceInstance = serviceInstanceBuilder.build();ServiceDiscovery<Map> serviceDiscovery = ServiceDiscoveryBuilder.builder(Map.class).client(client).serializer(new JsonInstanceSerializer<Map>(Map.class)).basePath(SERVICE_ROOT_PATH).build();serviceDiscovery.registerService(serviceInstance);serviceDiscovery.start();}/*** 服务发现:通过serviceDiscovery查询到服务*/public void discovery(String serviceName) {try {ServiceDiscovery<Map> serviceDiscovery = ServiceDiscoveryBuilder.builder(Map.class).client(client).basePath(SERVICE_ROOT_PATH).build();serviceDiscovery.start();//根据名称获取服务Collection<ServiceInstance<Map>> services = serviceDiscovery.queryForInstances(serviceName);for (ServiceInstance<Map> service : services) {System.out.print(service.getPayload() + " -- ");System.out.println(service.getAddress() + ":" + service.getPort());}System.out.println();} catch (Exception e) {e.printStackTrace();}}

参考资料:
https://blog.csdn.net/smartbetter/article/details/53083816
https://www.icode9.com/content-1-116250.html

Springboot使用Curator 集成zk相关推荐

  1. SpringBoot 2.x 集成 Redis

    SpringBoot 2.x 集成 Redis windows上搭建redis环境 添加依赖 此处redis客户端使用jedis. <!-- redis --> <dependenc ...

  2. springboot security 权限校验_十二、SpringBoot 优雅的集成Spring Security

    前言 至于什么是Spring security ,主要两个作用,用户认证和授权.即我们常说的,用户只有登录了才能进行其他操作,没有登录的话就重定向到登录界面.有的用户有权限执行某一操作,而有的用户不能 ...

  3. SpringBoot与Docker集成

    SpringBoot与Docker集成 许多人正在使用容器包装其Spring Boot应用程序,而构建容器并不是一件容易的事.这是Spring Boot应用程序开发人员的指南,容器对于开发人员而言并非 ...

  4. springboot使用curator来实现leader选举

    本文来说下springboot使用curator来实现leader选举 文章目录 概述 概述

  5. springboot使用curator实现服务的注册和发现

    本文来说下springboot使用curator实现服务的注册和发现 文章目录 概述 概述

  6. SpringBoot与SpringCloud集成

    SpringBoot与SpringCloud集成 : 简介 Spring Cloud是一系列框架的有序集合.它利用Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发,如服务发现注册 ...

  7. SpringBoot和Elasticsearch集成

    SpringBoot和Elasticsearch的集成: 步骤 依赖 在Maven的pom文件中 123456789 <!--SpringBoot默认使用SpringData ElasticSe ...

  8. springboot整合curator实现分布式锁

    理论篇: Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处 ...

  9. SpringBoot使用JWT集成Ng-Alain之Token失效处理

    在 SpringBoot使用JWT集成Ng-Alain中,我们简单介绍了SpringBoot与Ng-Alain的集成,在这种前后端分离框架实践中,我们使用了JWT来作为交互的安全标识,考虑一个问题,从 ...

  10. 一个springboot 项目a集成另一个springboot 项目b

    一个springboot 项目a集成另一个springboot 项目b 并且可以运行访问b的controller层 操作1: 项目b打包依赖修改,把上面的springboot默认打包依赖注释,改为下面 ...

最新文章

  1. Java单元测试的意义_单元测试重要意义及方法介绍
  2. grafana 画拓扑图 能不能_Grafana之ImageIt实现动态可感知网络拓扑(第十七篇)
  3. mysql 导出gbk_mysqldump指定编码导出数据,GBK编码实践
  4. 解决启动springboot项目时localhost一直显示Whitelabel Error Page和@ConfigurationProperties标红
  5. iPhone 13 Pro手机壳曝光 网友:更丑了
  6. 苹果指控高通从事非法专利授权活动 索赔最高270亿美元
  7. vb脚本从入门到精通_sqlmap从入门到精通第七章720 绕过WAF脚本space2morecommentamp;space2morehash...
  8. 华为机试HJ32:密码截取
  9. linux 会不会受到永恒之蓝漏洞,永恒之蓝漏洞复现(ms17-010)
  10. 绘制箱线图的标签python_利用Python - Matplotlib 绘制箱线图
  11. is_callable_Python callable()和__call __()
  12. wincc工程组态论文_仪表人零基础学系统组态,必备知识!2020.12.12
  13. vhdl入门6分频器
  14. 疯狂突破高中句型300句
  15. 计算机信息安全培训简报,以训促防 筑牢网络信息安全 我局举办网络信息安全培训...
  16. PS魔棒工具的使用方法
  17. Mac如何查看系统根目录
  18. 中国式家长计算机怎么学,中国式家长开局学习技巧详解 大神教你如何完美开局...
  19. 计算机考试记不住题目,驾考科目一口诀,科一题目太多记不住?快来看看这些技巧...
  20. 基于Modis数据监测森林火灾

热门文章

  1. Spring的初体验-1
  2. uniapp在移动端软键盘监听(弹出,收起),及影响定位布局的问题
  3. 机器学习数学基础十:相关分析
  4. 四面体体积公式 hdu 1411
  5. 台式机标准计算机配置清单,台式机组装,教您组装电脑高配置清单
  6. ERP: ERP系统的作用
  7. 一文读懂局域网、广域网、WLAN、WiFi的联系与区别
  8. R语言根据日历周期处理时间序列数据(周、月、年等):使用xts包的apply.quarterly函数和mean函数计算时间序列的季度平均值(quarterly)
  9. iOS 地图坐标系转换
  10. PGSQL查询今天生日的员工