zookeeper客户端使用与集群特性

  • zookeeper Java客户端
    • 项目构建
    • 创建客户端实例
    • 测试结果
    • 创建zookeeper实例参数说明
    • 演示java客户端增删查改zookeeper节点
  • Apache Curator 开源客户端
    • 什么是 Curator
    • Curator 实战
      • 会话创建
      • 创建节点
      • 异步接口
  • zookeeper集群&不停机动态扩容/缩容
    • zookeeper集群搭建

zookeeper Java客户端

项目构建

zookeeper 官方的客户端没有和服务端代码分离,他们为同一个jar 文件,所以我们直接引入 zookeeper的maven即可, 这里版本请保持与服务端版本一致,不然会有很多兼容性的问题

<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.8</version>
</dependency>

创建客户端实例

  • 定义要发送的包装类
package zookeeper.client;import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;@Data
@ToString
@NoArgsConstructor
public class MyConfig {private String key;private String name;
}
  • 定义zookeeper客户端
package zookeeper.client;import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;@Slf4j
public class ConfigCenter {private final static  String CONNECT_STR="192.168.1.104:2181";private final static Integer  SESSION_TIMEOUT=30*1000;private static ZooKeeper zooKeeper=null;private static CountDownLatch countDownLatch=new CountDownLatch(1);public static void main(String[] args) throws IOException, InterruptedException, KeeperException {zooKeeper=new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType()== Event.EventType.None&& event.getState() == Event.KeeperState.SyncConnected){log.info("连接已建立");countDownLatch.countDown();}}});countDownLatch.await();MyConfig myConfig = new MyConfig();myConfig.setKey("anykey");myConfig.setName("anyName");ObjectMapper objectMapper=new ObjectMapper();byte[] bytes = objectMapper.writeValueAsBytes(myConfig);String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);Watcher watcher = new Watcher() {@SneakyThrows@Overridepublic void process(WatchedEvent event) {if (event.getType()== Event.EventType.NodeDataChanged&& event.getPath()!=null && event.getPath().equals("/myconfig")){log.info(" PATH:{}  发生了数据变化" ,event.getPath());byte[] data = zooKeeper.getData("/myconfig", this, null);MyConfig newConfig = objectMapper.readValue(new String(data), MyConfig.class);log.info("数据发生变化: {}",newConfig);}}};byte[] data = zooKeeper.getData("/myconfig", watcher, null);MyConfig originalMyConfig = objectMapper.readValue(new String(data), MyConfig.class);log.info("原始数据: {}", originalMyConfig);TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);}
}
  • log4j.properties配置类
# Copyright 2012 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.# Define some default values that can be overridden by system properties
zookeeper.root.logger=ERROR, CONSOLEzookeeper.console.threshold=INFO
zookeeper.log.dir=.
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=INFO
zookeeper.log.maxfilesize=256MB
zookeeper.log.maxbackupindex=20zookeeper.tracelog.dir=${zookeeper.log.dir}
zookeeper.tracelog.file=zookeeper_trace.loglog4j.rootLogger=${zookeeper.root.logger}#
# console
# Add "console" to rootlogger above if you want to use this
#
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n#
# Add ROLLINGFILE to rootLogger to get log file output
#
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}
log4j.appender.ROLLINGFILE.MaxFileSize=${zookeeper.log.maxfilesize}
log4j.appender.ROLLINGFILE.MaxBackupIndex=${zookeeper.log.maxbackupindex}
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n#
# Add TRACEFILE to rootLogger to get log file output
#    Log TRACE level and above messages to a log file
#
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
log4j.appender.TRACEFILE.Threshold=TRACE
log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
### Notice we are including log4j's NDC here (%x)
log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n
log4j.logger.zookeeper.client=INFO

测试结果


创建zookeeper实例参数说明

参数名称 含义
connectString ZooKeeper服务器列表,由英文逗号分开的host:port字符串组成, 每一个都代表一台ZooKeeper机器,如, host1:port1,host2:port2,host3:port3。另外,也可以在connectString中设置客户端连接上ZooKeeper 后的根目录,方法是在host:port字符串之后添加上这个根目录,例 如,host1:port1,host2:port2,host3:port3/zk-base,这样就指定了该客户端连 接上ZooKeeper服务器之后,所有对ZooKeeper 的操作,都会基于这个根目录。例如,客户端对/sub-node 的操作,最终创建 /zk-node/sub-node, 这个目录也叫Chroot,即客户端隔离命名空间。
sessionTimeout 会话的超时时间,是一个以“毫秒”为单位的整型值。在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳 检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效 的心跳检测,会话就会失效。
watcher ZooKeeper允许 客户端在构造方法中传入一个接口 watcher (org.apache. zookeeper. Watcher)的实现类对象来作为默认的Watcher事件通知处理器。当然,该参 数可以设置为null 以表明不需要设置默认的 Watcher处理器。
canBeReadOnly 这是一个boolean类型的参数,用于标识当前会话是否支持“read-only(只 读)”模式。默认情况下,在ZooKeeper集群中,一个机器如果和集群中过半及以上机器失去了网络连接,那么这个机器将不再处理客户端请求(包括读写请求)。但是在某些使用场景下,当ZooKeeper服务器发生此类故障的时候,我 们还是希望ZooKeeper服务器能够提供读服务(当然写服务肯定无法提供)—— 这就是 ZooKeeper的“read-only”模式
sessionId和 sessionPasswd 分别代表会话ID和会话秘钥。这两个参数能够唯一确定一个会话,同时客户端使用这两个参数可以实现客户端会话复用,从而达到恢复会话的效果。具体使用方法是,第一次连接上ZooKeeper服务器时,通过调用ZooKeeper对象实例的以下两个接口,即可获得当前会话的ID和秘钥: long getSessionId(); byte[]getSessionPasswd( ); 荻取到这两个参数值之后,就可以在下次创建ZooKeeper对象实例的时候传入构造方法了

演示java客户端增删查改zookeeper节点

package zookeeper.client;import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;@Slf4j
public abstract class StandaloneBase {private static final String CONNECT_STR = "192.168.1.104:2181";private static final int SESSION_TIMEOUT = 30 * 1000;private static ZooKeeper zooKeeper = null;private static CountDownLatch countDownLatch = new CountDownLatch(1);private Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected&& event.getType() == Event.EventType.None) {countDownLatch.countDown();log.info("连接建立");}}};@Beforepublic void init() {try {log.info(" start to connect to zookeeper server: {}", getConnectStr());zooKeeper = new ZooKeeper(getConnectStr(), getSessionTimeout(), watcher);log.info(" 连接中...");countDownLatch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}public static ZooKeeper getZooKeeper() {return zooKeeper;}@Afterpublic void test() {try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}protected String getConnectStr() {return CONNECT_STR;}protected int getSessionTimeout() {return SESSION_TIMEOUT;}
}
package zookeeper.client;import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.Test;import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;@Slf4j
public class AclOperations extends StandaloneBase {/***  用 world 模式创建节点** @throws KeeperException* @throws InterruptedException*/@Testpublic void createWithAclTest1() throws KeeperException, InterruptedException {List<ACL> acLList = new ArrayList<ACL>();ACL e = new ACL();Id m_ = new Id();m_.setId("anyone");m_.setScheme("world");int perms = ZooDefs.Perms.ADMIN  |  ZooDefs.Perms.READ;e.setId(m_);e.setPerms(perms);acLList.add(e);String s = getZooKeeper().create("/zk-node-1", "shikaiqiang".getBytes(), acLList, CreateMode.PERSISTENT);log.info("create path: {}",s);}/**** 用授权模式创建节点* @throws KeeperException* @throws InterruptedException*/@Testpublic void createWithAclTest2() throws KeeperException, InterruptedException {// 对连接添加授权信息getZooKeeper().addAuthInfo("digest","u400:p400".getBytes());List<ACL> acLList = new ArrayList<ACL>();ACL e = new ACL();Id m_ = new Id();m_.setId("u400:p400");m_.setScheme("auth");int perms = ZooDefs.Perms.ADMIN  |  ZooDefs.Perms.READ;e.setId(m_);e.setPerms(perms);acLList.add(e);String s = getZooKeeper().create("/zk-node-2", "shikaiqiang".getBytes(), acLList, CreateMode.PERSISTENT);log.info("create path: {}",s);}@Testpublic void createWithAclTest3() throws KeeperException, InterruptedException {// 对连接添加授权信息getZooKeeper().addAuthInfo("digest","u400:p400".getBytes());byte[] data = getZooKeeper().getData("/test", false, null);log.info("GET_DATA : {}",new String(data));}public static void main(String[] args) throws NoSuchAlgorithmException {String sId = DigestAuthenticationProvider.generateDigest("skq:123456");System.out.println(sId);//  -Dzookeeper.DigestAuthenticationProvider.superDigest=gj:X/NSthOB0fD/OT6iilJ55WJVado=}
}


package zookeeper.client;import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;@Slf4j
public class BaseOperations extends StandaloneBase {private String first_node = "/first-node";@Testpublic void testCreate() throws KeeperException, InterruptedException {ZooKeeper zooKeeper = getZooKeeper();String s = zooKeeper.create(first_node, "first".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);log.info("Create:{}", s);}@Testpublic void testGetData() {Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getPath() != null && event.getPath().equals(first_node)&& event.getType() == Event.EventType.NodeDataChanged) {log.info(" PATH: {}  发现变化", first_node);try {byte[] data = getZooKeeper().getData(first_node, this, null);log.info(" data: {}", new String(data));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}};try {byte[] data = getZooKeeper().getData(first_node, watcher, null);  //log.info(" data: {}", new String(data));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void testSetData() throws KeeperException, InterruptedException {ZooKeeper zooKeeper = getZooKeeper();Stat stat = new Stat();byte[] data = zooKeeper.getData(first_node, false, stat);// int version = stat.getVersion();zooKeeper.setData(first_node, "third".getBytes(), 0);}@Testpublic void testDelete() throws KeeperException, InterruptedException {// -1 代表匹配所有版本,直接删除// 任意大于 -1 的代表可以指定数据版本删除getZooKeeper().delete(first_node, -1);}@Testpublic void asyncTest() {String userId = "xxx";getZooKeeper().getData("/test", false, (rc, path, ctx, data, stat) -> {Thread thread = Thread.currentThread();log.info(" Thread Name: {},   rc:{}, path:{}, ctx:{}, data:{}, stat:{}", thread.getName(), rc, path, ctx, data, stat);}, "test");log.info(" over .");}}






Apache Curator 开源客户端

什么是 Curator

Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用,Curator 把我们平时常用的很多 ZooKeeper服务开发功能做了封装,例如 Leader 选举、 分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工 作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper 处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简单,不但减少了开发时间,而且增强了程序的可靠性。

Curator 实战

首先要引入Curator 框架相关的开发包,第一个是 curator-framework 包,该包是对 ZooKeeper 底层 API 的一 些封装。另一个是 curator-recipes 包,该包封装了一些 ZooKeeper 服务的高级特性,如: Cache 事件监听、选举、分布式锁、分布式 Barrier。为了方便测试引入 了junit ,lombok,由于Zookeeper本身以来了 log4j 日志框架,所以这里可以创建对应的 log4j配置文件后直接使用。

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.8</version>
</dependency>
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13</version>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version>
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.8.3</version>
</dependency>

会话创建

要进行客户端服务器交互,第一步就要创建会话 Curator 提供了多种方式创建会话,比如用静态工厂方式创建:

// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.1.104:2181") .sessionTimeoutMs(5000) // 会话超时时间 .connectionTimeoutMs(5000) // 连接超时时间 .retryPolicy(retryPolicy) .namespace("base") // 包含隔离名称 .build();
client.start();

这段代码的编码风格采用了流式方式,最核心的类是 CuratorFramework 类,该类的作用是定 义一个 ZooKeeper 客户端对象,并在之后的上下文中使用。在定义 CuratorFramework 对象 实例的时候,我们使用了 CuratorFrameworkFactory 工厂方法,并指定了 connectionString 服务器地址列表、retryPolicy 重试策略 、sessionTimeoutMs 会话超时时间、 connectionTimeoutMs 会话创建超时时间。

connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以 是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3 。

retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。

策略名称 描述
ExponentialBackoffRetry 重试一组次数,重试之间的睡眠时间增加
RetryNTimes 重试最大次数
RetryOneTime 只重试一次
RetryUntilElapsed 在给定的时间结束之前重试

超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。

创建节点

在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型 (持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息

guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除

deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点

package zookeeper.curator;import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;import java.util.concurrent.TimeUnit;@Slf4j
public abstract class CuratorStandaloneBase {private static final String CONNECT_STR = "192.168.1.104:2181";private static final int sessionTimeoutMs = 60 * 1000;private static final int connectionTimeoutMs = 5000;private static CuratorFramework curatorFramework;@Beforepublic void init() {RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);curatorFramework = CuratorFrameworkFactory.builder().connectString(getConnectStr()).retryPolicy(retryPolicy).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).canBeReadOnly(true).build();curatorFramework.getConnectionStateListenable().addListener((client, newState) -> {if (newState == ConnectionState.CONNECTED) {log.info("连接成功!");}});log.info("连接中......");curatorFramework.start();}public void createIfNeed(String path) throws Exception {Stat stat = curatorFramework.checkExists().forPath(path);if (stat == null) {String s = curatorFramework.create().forPath(path);log.info("path {} created! ", s);}}public static CuratorFramework getCuratorFramework() {return curatorFramework;}@Afterpublic void test() {try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}protected String getConnectStr() {return CONNECT_STR;}
}

创建节点的方式如下面的代码所示,回顾我们之前课程中讲到的内容,描述一个节点要包括节点 的类型,即临时节点还是持久节点、节点的数据信息、节点是否是有序节点等属性和性质。

package zookeeper.curator;import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
public class CuratorBaseOperations extends CuratorStandaloneBase {// 递归创建子节点@Testpublic void testCreateWithParent() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/node-parent/sub-node-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{}  successfully.", path);}// protection 模式,防止由于异常原因,导致僵尸节点@Testpublic void testCreate() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String forPath = curatorFramework.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "some-data".getBytes());log.info("curator create node :{}  successfully.", forPath);}@Testpublic void testGetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();byte[] bytes = curatorFramework.getData().forPath("/test");log.info("get data from  node :{}  successfully.", new String(bytes));}@Testpublic void testSetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();curatorFramework.setData().forPath("/test", "changed!".getBytes());byte[] bytes = curatorFramework.getData().forPath("/test");log.info("get data from  node /curator-node :{}  successfully.", new String(bytes));}@Testpublic void testDelete() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);}@Testpublic void testListChildren() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/node-parent";List<String> strings = curatorFramework.getChildren().forPath(pathWithParent);strings.forEach(System.out::println);}
}






异步接口

Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是 在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。

public interface BackgroundCallback
{/*** Called when the async background operation completes** @param client the client* @param event operation result details* @throws Exception errors*/public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}

如上接口,主要参数为client 客户端 和服务端事件event

inBackground 异步处理默认在EventThread中执行,指定线程池

@Test
public void testThreadPool() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();ExecutorService executorService = Executors.newSingleThreadExecutor();String ZK_NODE = "/zk-node";curatorFramework.getData().inBackground((client, event) -> {log.info(" background: {}", event);}, executorService).forPath(ZK_NODE);}

zookeeper集群&不停机动态扩容/缩容

zookeeper集群模式一共有三种类型的角色

  • Leader: 处理所有的事务请求(写请求),可以处理读请求集群中只能有一个Leader
  • Follower只能处理读请求,同时作为 Leader的候选节点,即如果Leader宕机,Follower要参与到新的Leader选举中,有可能成为新的Leader节点。
  • Observer只能处理读请求。不能参与选举

zookeeper集群搭建


配置zoo1.cfg文件

分别配置zoo1.cfg,zoo2.cfg,zoo3.cfg,zoo4.cfg



启动4个zookeeper实例

连接实例

bin/zkCli.sh -server 192.168.1.104:2181,192.168.1.104:2182,192.168.1.104:2183,192.168.1.104:2184


public static void main(String[] args) throws Exception {RetryPolicy retryPolicy=new ExponentialBackoffRetry( 5*1000, 10 );String connectStr = "192.168.1.104:2181,192.168.1.104:2182,192.168.1.104:2183,192.168.1.104:2184";CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectStr, retryPolicy);curatorFramework.start();String pathWithParent = "/test";byte[] bytes = curatorFramework.getData().forPath(pathWithParent);System.out.println(new String(bytes));while (true) {try {byte[] bytes2 = curatorFramework.getData().forPath(pathWithParent);System.out.println(new String(bytes2));TimeUnit.SECONDS.sleep(5);} catch (Exception e) {e.printStackTrace();}}}

zookeeper客户端使用与集群特性相关推荐

  1. 2.zookeeper客户端使用与集群特性

    主要内容: 客户端 zookeeper客户端简介,C客户端 客户端连接参数说明 客户端CRUD 客户端监听 集群 集群架构说明 集群配置及参数说明 选举投票机制 主从复制机制 一.客户端API常规应用 ...

  2. 二、zookeeper客户端使用和集群特性

    内容梗概: 客户端 zookeeper客户端简介,C客户端 客户端连接参数说明 客户端CRUD 客户端监听 集群 集群架构说明 集群配置及参数说明 选举投票机制 主从复制机制 一.客户端API常规应用 ...

  3. 一文带你了解Zookeeper基本概念、集群搭建、使用方法

    本文图文并茂的描述了:zookeeper是什么,演示了Zookeeper集群如何搭建.Zookeeper常用命令的使用.如何查看Zookeeper日志:详细描述了Zookeeper数据模型.watch ...

  4. zookeeper的使用与集群搭建以及原理应用

    1.zookeeper介绍     zookeeper是一个为分布式应用提供一致性服务的软件,它包含一个简单的原语集,分布式应用程序可以根据它实现同步服务,     配置维护和命名服务等.     基 ...

  5. (超详细)基于Zookeeper的Hadoop HA集群的搭建

    基于Zookeeper的Hadoop HA集群的搭建 JunLeon--go big or go home 前言: 高可用(high availability,HA)指的是若当前工作中的机器宕机了,系 ...

  6. Zookeeper源码之集群选举

    前言 zookeeper算是一个流行的分布式协调框架,在大量java分布式中间件中广泛使用.在学习zookeeper的源码前建议先了解一下分布式一致性协议的概念,zookeeper自己实现了一套满足c ...

  7. 云计算考证笔记、集群特性、HA

    目录 集群特性: 内存复用技术: 内存共享,写时复制: 内存置换: 内存气泡: NUMA: 虚拟化NUMA: 物理机NUMA: HA (High Available): 热迁移: 手工热迁移: 动态资 ...

  8. 【2】基于zookeeper,quartz,rocketMQ实现集群化定时系统

    <一>项目结构图 (1)ZK协调分配 ===>集群中的每一个定时服务器与zookeeper交互,由集群中的master节点进行任务划分,并将划分结果分配给集群中的各个服务器节点. = ...

  9. Redis基于客户端分片的集群案例(待实践)

    说明: 下面的示例基本都是基于Linux去实现,目的是为了环境的统一,以便于把性能调整到最优.且基于Java.建议生产环境不要使用Windows/Mac OS这些. 在Java领域,基于客户端进行分片 ...

最新文章

  1. css文字向右对齐_web前端入门到实战:css常用样式对文本的处理演练
  2. phpstrom php出现404
  3. 【译】最大限度地降低多线程 C# 代码的复杂性
  4. 跨域加了header也解决不了?
  5. 关于zendframework中的Zend_Db_Expr(不自动加引号)
  6. boost::hana::is_convertible用法的测试程序
  7. layui 单选框、复选框、下拉菜单 不显示问题 记录
  8. C# Global.asax.cs 定时任务
  9. 特征工程之特征选择_特征工程与特征选择
  10. Linux: shell 中命令代换 $() 和 ``(有图有代码有真相!!!)
  11. 写得好的html网页,优化网站排名-使用Markdown编写更好的内容和HTML
  12. java里decimalformat_Java中DecimalFormat用法详解
  13. 迅捷屏幕录像工具录制视频使用方法
  14. 修改键盘映射、交换按键
  15. 在Linux虚拟机终端切换普通用户身份和root身份
  16. redis 优惠券秒杀逐步优化
  17. 项目实战2 | 基于Swarm+Prometheus实现双VIP可监控Web高可用集群
  18. 初始C语言-分支与循环语句
  19. 读书笔记:《黎明之街》
  20. Nginx负载均衡策略 - least_conn 最少连接

热门文章

  1. IT行业招聘技巧--JD分析篇
  2. 北京租房子被骗---------每日反省篇
  3. HTML5 在线学习网站
  4. pear php linux,linux下安装PEAR、Zend Debugger和Smarty
  5. 安装vs2015_community()社区版+win10,安装之后,打开项目显示不兼容,应用程序未能正确安装
  6. 我的Linux学习之路(纯小白)
  7. 路缘石滑模机在作业中进行效率作业的底气
  8. yolov7+SE注意力机制(个人备忘录)
  9. WAMP和PHPStorm安装(Win10)
  10. [css]版心和布局流程