在《zookeeper快速入门——简介》一文中,我们介绍了zookeeper的机制。但是还是比较抽象,没有直观感受到它在分布式系统中的应用。本文我们使用一个例子,三次迭代演进,来说明Zookeeper Client端和Server端如何配合以实现分布式协作。(转载请指明出于breaksoftware的csdn博客)

为了例子足够简单明确,我们以实现“分布式锁”为例。所谓分布式锁,就是在一个分布式系统中,各个子系统可以共享的同一把“锁”。这样大家可以在这把锁的协调下,进行协作。

我们可以尝试在Zookeeper Server的节点树上创建一个特定名称的节点。如果创建成功了,则认为获取到了锁。Client可以执行相应业务逻辑,然后通知Server删除该节点以释放锁。其他Client可能在此时正好去创建该节点,并成功了,那么它就获得了锁。其他创建失败的Client则被认为没有获得锁,则继续等待和尝试。

可能此时你已经意识到一个问题:如果某个获得锁的Client和Server断开了连接,而没有机会通知Server删除test_lock文件。那就导致整个系统处于“死锁”状态。

不用担心,zookeeper设计了“临时”节点的概念。“临时”节点由Client向Server端请求创建,一旦Client和Server连接断开,这个Client创建的“临时”节点将被删除。这样我们就不用担心因为连接断开而导致的问题了。和普通节点一样,“临时”节点也可以被Client主动删除。

基本思路理清楚后,我们开始着手编写这块逻辑。为了简单,我们在一个进程内部使用多线程技术模拟分布在不同机器上的Client端。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <zookeeper.h>
#include <zookeeper_log.h>

第一步我们要使用zookeeper_init方法去连接Zookeeper Server。该函数原型如下

ZOOAPI zhandle_t* zookeeper_init (   const char *    host,watcher_fn     watcher,int     recv_timeout,const clientid_t *     clientid,void *     context,int     flags
)   

该方法创建了一个zhandle_t指针和一个与之绑定的连接session,之后我们将在一直要使用这个指针和Server进行通信。

但是这个函数有个陷阱:即使返回了一个可用指针,可是与之绑定的session此时不一定可用。我们需要等到ZOO_CONNECTED_STATE消息到来才能确认。此时我们就要借助zookeeper中无处不在的监视功能(watcher)。

zookeeper_init方法第二个参数传递的是一个回调函数地址——watcher,第五个参数传递的是这个回调函数可以使用的上下文信息——context。

为了让回调函数可以通知工作线程session已经可用,我们可以把上下文信息设置为一个包含条件变量的结构watchctx_t

typedef struct watchctx_t {pthread_cond_t cond;pthread_mutex_t cond_lock;
} watchctx_t;

这样在回调函数中,如果我们收到ZOO_CONNECTED_STATE通知,就触发条件变量

void main_watcher(zhandle_t* zh, int type, int state,const char* path, void* watcherCtx)
{if (type == ZOO_SESSION_EVENT) {watchctx_t *ctx = (watchctx_t*)watcherCtx;if (state == ZOO_CONNECTED_STATE) {pthread_cond_signal(&ctx->cond);}}
}

在调用zookeeper_init方法后,工作线程一直等待条件变量,如果超过设置的超时时间,就认为连接失败

int init_watchctx(watchctx_t* ctx) {if (0 != pthread_cond_init(&ctx->cond, NULL)) {fprintf(stderr, "condition init error\n");return -1;}if (0 != pthread_mutex_init(&ctx->cond_lock, NULL)) {fprintf(stderr, "mutex init error\n");pthread_cond_destroy(&ctx->cond);return -2;}return 0;
}
zhandle_t* init() {const char* host = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";int timeout = 30000;zhandle_t* zh = NULL;watchctx_t ctx;if (0 != init_watchctx(&ctx)) {return zh;}zh = zookeeper_init(host, main_watcher, timeout, 0, &ctx, 0);if (zh == NULL) {fprintf(stderr, "Error when connecting to zookeeper servers...\n");pthread_cond_destroy(&ctx.cond);pthread_mutex_destroy(&ctx.cond_lock);return zh;}struct timeval now;  struct timespec outtime;     gettimeofday(&now, NULL);  outtime.tv_sec = now.tv_sec + 1;  outtime.tv_nsec = now.tv_usec * 1000;pthread_mutex_lock(&ctx.cond_lock);int wait_result = pthread_cond_timedwait(&ctx.cond, &ctx.cond_lock, &outtime);pthread_mutex_unlock(&ctx.cond_lock);pthread_cond_destroy(&ctx.cond);pthread_mutex_destroy(&ctx.cond_lock);if (0 != wait_result) {fprintf(stderr, "Connecting to zookeeper servers timeout...\n");zookeeper_close(zh);zh = NULL;return zh;}return zh;
}

解决了连接问题,后面的逻辑就简单了。我们使用zoo_create方法创建一个路径为/test_lock的临时节点,然后通过返回结果判断是否获得锁

void thread_routine(void* ptr) {zhandle_t* zh = init();if (!zh) {return;}const char* lock_data = "lock";const char* lock_path =  "/test_lock";int ret = ZNODEEXISTS;do {ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);if (ZNODEEXISTS == ret) {//fprintf(stderr, "lock exist\n");continue;}else if (ZOK == ret) {pthread_t pid = pthread_self();fprintf(stdout, "%lu get lock\n", (long long)pid);zoo_delete(zh, lock_path, -1);sleep(1);}   else {fprintf(stderr, "Error %d for %s\n", ret, "create");break;}} while (1);zookeeper_close(zh);
}

上述代码19行开始的逻辑表示这个线程获取了锁,它只是简单的打印出get lock,然后调用zoo_delete删除节点——释放锁。

这个函数使用一个while死循环来控制业务进行,这种不停调用zoo_create去检测是否获得锁的方法非常浪费资源。那我们如何对这个函数进行改造?

如果我们可以基于事件驱动监控/test_lock节点状态就好了。zookeeper也提供了这种方式——还是watcher。

void thread_routine(void* ptr) {zhandle_t* zh = init();if (!zh) {return;}watchctx_t ctx;if (0 != init_watchctx(&ctx)) {return;}const char* lock_data = "lock";const char* lock_path = "/test_lock";int ret = ZNODEEXISTS;do {struct Stat stat;int cur_st = zoo_wexists(zh, lock_path, lock_watcher, &ctx, &stat);if (ZOK == cur_st) {//fprintf(stdout, "wait\n");pthread_mutex_lock(&ctx.cond_lock);pthread_cond_wait(&ctx.cond, &ctx.cond_lock);pthread_mutex_unlock(&ctx.cond_lock);}ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);if (ZNODEEXISTS == ret) {//fprintf(stderr, "lock exist\n");}else if (ZOK == ret) {pthread_t pid = pthread_self();fprintf(stdout, "%lu get lock\n", (long long)pid);zoo_delete(zh, lock_path, -1);}   else {fprintf(stderr, "Error %d for %s\n", ret, "create");}} while (1);zookeeper_close(zh);pthread_cond_destroy(&ctx.cond);pthread_mutex_destroy(&ctx.cond_lock);
}

第18行,我们调用zoo_wexists方法监控节点状态。如果监控点设置成功,则等待上文中创建的条件变量。该条件变量在zoo_wexists参数的回调函数中被设置

void lock_watcher(zhandle_t* zh, int type, int state,const char* path, void* watcherCtx)
{//sleep(1);//fprintf(stdout, "lock_watcher: %s %d, %d\n", path, type, state); if (type == ZOO_DELETED_EVENT) {//fprintf(stdout, "delete %s\n", path);watchctx_t* ctx = (watchctx_t*)watcherCtx;pthread_cond_signal(&ctx->cond);}else {//fprintf(stdout, "add %s\n", path);struct Stat stat;zoo_wexists(zh, path, lock_watcher, watcherCtx, &stat);}
}

zookeeper的监控点是一次性的,即如果一次被触发则不再触发。于是在这个回调函数中,如果我们发现节点不是被删除——监控到它被其他Client创建,就再次注册该监控点。

这样我们就使用了相对高大上的事件通知机制。但是问题随之而来,这种方式会引起惊群现象。即在一个Client释放锁后,其他Client都会尝试去调用zoo_create去获取锁,这会造成系统抖动很强烈。

我们继续改进锁的设计。现在我们换个思路,让这些Client排着队去尝试获取锁。如果做呢?

每个Client在Server上按顺序创建一个节点,并监控比自己小的那个节点。如果比自己小的那个节点(最接近自己的)被删除了,则意味着:

  1. 可能排在“我”前面的Client和Server断开了连接,那么此时应该还没轮到“我”,于是“我”要找到此时比“我”小的、最邻近的节点路径,然后去监控这个节点。
  2. 可能排在“我”前面的所有Client都获得过锁了,并且它们都释放了,现在轮到“我”来获得锁了。

采用这种方式,我们可以最大限度的减少获取锁的行为。但是这对zookeeper提出了一个要求,我们可以原子性的创建包含单调递增数字的路径的节点。非常幸运的是,zookeeper的确提供了这样的方式——顺序节点。

void thread_routine(void* ptr) {zhandle_t* zh = init();if (!zh) {return;}watchctx_t ctx;if (0 != init_watchctx(&ctx)) {return;}#define ROOT_PATH "/test_seq_lock"const char* root_path = ROOT_PATH;const char* lock_data = "lock";const char* lock_path = ROOT_PATH"/0";int ret = ZNODEEXISTS;do {const int seq_path_lenght = 512;char sequence_path[seq_path_lenght];ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL | ZOO_SEQUENCE,sequence_path, sizeof(sequence_path) - 1);if (ZNODEEXISTS == ret) {//fprintf(stderr, "lock exist\n");}else if (ZOK == ret) {ret = wait_for_lock(zh, &ctx, root_path, sequence_path);if (ZOK == ret) {pthread_t pid = pthread_self();fprintf(stdout, "%lu %s get lock\n", (long long)pid, sequence_path);sleep(0.1);}zoo_delete(zh, sequence_path, -1);}   else if (ZNONODE == ret) {ret = zoo_create(zh, root_path, lock_data, strlen(lock_data),&ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);if (ZNODEEXISTS != ret && ZOK != ret) {fprintf(stderr, "Error %d for %s\n", ret, "create root path");break;}}else {fprintf(stderr, "Error %d for %s\n", ret, "create");}} while (1);zookeeper_close(zh);pthread_cond_destroy(&ctx.cond);pthread_mutex_destroy(&ctx.cond_lock);
}

第23行,我们给zoo_create方法传入了一个路径空间用于接收创建的有序节点路径。第28行,我们将这个路径连同条件变量一起传入自定义函数wait_for_lock去等待获得锁的时机。

int search_watch_neighbor(zhandle_t* zh, const char* root_path,const char* cur_name,char* neighbor_name, int len){struct String_vector strings;int rc = zoo_get_children(zh, root_path, 0, &strings);if (ZOK != rc || 0 == strings.count) {return ZNOTEMPTY;       }int neighbor = -1;for (int i = 0; i < strings.count; i++) {int cmp = strcmp(cur_name, strings.data[i]);if (cmp <= 0) {continue;}if (-1 == neighbor) {neighbor = i;continue;}cmp = strcmp(strings.data[neighbor], strings.data[i]);if (cmp >= 0) {continue;}neighbor = i;}if (-1 == neighbor) {*neighbor_name = 0;return ZNONODE;}int neighbor_name_len = strlen(strings.data[neighbor]);if (len < neighbor_name_len - 1) {*neighbor_name = 0;return ZBADARGUMENTS;}memcpy(neighbor_name, strings.data[neighbor], neighbor_name_len);*(neighbor_name + neighbor_name_len) = '\0';fprintf(stdout, "********\n self: %s neighbor:%s\n*********\n", cur_name, neighbor_name);return ZOK;
}void neighbor_watcher(zhandle_t* zh, int type, int state,const char* path, void* watcherCtx)
{if (type == ZOO_DELETED_EVENT) {watchctx_t* ctx = (watchctx_t*)watcherCtx;const int path_len_max = 512;char neighbor_name[path_len_max];int ret = search_watch_neighbor(zh, ctx->root_path, ctx->cur_name, neighbor_name, sizeof(neighbor_name));if (ZNONODE == ret) {pthread_cond_signal(&ctx->cond);}else if (ZOK == ret) {char neighbor_path[path_len_max];sprintf(neighbor_path, "%s/%s", ctx->root_path, neighbor_name);struct Stat stat;zoo_wexists(zh, neighbor_path, neighbor_watcher, watcherCtx, &stat);}}
}int wait_for_lock(zhandle_t* zh, watchctx_t* ctx, const char* root_path, const char* sequence_path) {strcpy(ctx->root_path, root_path);strcpy(ctx->cur_name, sequence_path + strlen(root_path) + 1);const int path_len_max = 512;char neighbor_name[path_len_max];int status = ZOK;do {int ret = search_watch_neighbor(zh, ctx->root_path, ctx->cur_name, neighbor_name, sizeof(neighbor_name));char neighbor_path[path_len_max];sprintf(neighbor_path, "%s/%s", root_path, neighbor_name);pthread_t pid = pthread_self();fprintf(stdout, "%lu get neighbor info: %d %s\n", (long long)pid, ret, neighbor_path);if (ZNONODE == ret) {status = ZOK;break;}else if (ZOK == ret) {struct Stat stat;if (ZOK == zoo_wexists(zh, neighbor_path, neighbor_watcher, ctx, &stat)) {pthread_mutex_lock(&ctx->cond_lock);pthread_cond_wait(&ctx->cond, &ctx->cond_lock);pthread_mutex_unlock(&ctx->cond_lock);}else {continue;}}else {status = ZSYSTEMERROR;break;}} while(1);return status;
}

再结合main函数的实现,两种不方式设计的分布式锁都可以运行起来

#define countof(x) sizeof(x)/sizeof(x[0])int main(int argc, const char *argv[]) {const int thread_num = 3;pthread_t ids[thread_num];for (int i = 0; i < countof(ids); i++) {pthread_create(&ids[i], NULL, (void*)thread_routine, NULL);}for (int i = 0; i < countof(ids); i++) {pthread_join(ids[i], NULL);}return 0;
}

将上述文件保存为lock_test.c,然后调用下面的指令编译

gcc -o lock_test lock_test.c -I/home/work/fangliang/zookeeper-3.4.11/src/c/generated -I/home/work/fangliang/zookeeper-3.4.11/src/c/include -L/home/work/fangliang/zookeeper-3.4.11/src/c/.libs -lzookeeper_mt -DTHREADED -std=c99

关于zookeeper库的编译,网上有很多。我编译起来还算顺利,只是在找不到so时候使用下面指令指定下查找路径

export LD_LIBRARY_PATH=/home/work/fangliang/zookeeper-3.4.11/src/c/.libs:$LD_LIBRARY_PATH

参考资料

  • https://www.cnblogs.com/xybaby/p/6871764.html
  • http://lib.csdn.net/article/hadoop/6665
  • https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/
  • http://www.cnblogs.com/haippy/archive/2013/02/21/2920280.html
  • http://zookeeper.sourcearchive.com/documentation/3.2.2plus-pdfsg3/zookeeper_8h.html
  • 《Zookeeper分布式过程协同技术详解》

zookeeper快速入门——应用(两种分布式锁)相关推荐

  1. java 通过redis实现倒计时_突破Java面试(42) - Redis amp; ZooKeeper两种分布式锁实现的优劣...

    0 Github 1 面试题 一般实现分布式锁都有哪些方式?使用redis如何设计分布式锁?使用zk来设计分布式锁可以吗?这两种分布式锁的实现方式哪种效率比较高? 2 考点分析 一般先问问你zk,然后 ...

  2. ZK和Redis两种分布式锁对比

    一.Redis分布式锁 1.setnx + lua脚本 优点:redis基于内存,读写性能很高,因此基于redis的分布式锁效率比较高 缺点:分布式环境下可能会有节点数据同步问题,可靠性有一定的影响. ...

  3. 什么是分布式锁?几种分布式锁分别是怎么实现的?

    一.什么是分布式锁: 1.什么是分布式锁: 分布式锁,即分布式系统中的锁.在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题.与单体应用不同 ...

  4. python 操作 zookeeper 快速入门

    python 操作 zookeeper 快速入门 文章目录 python 操作 zookeeper 快速入门 什么是zookeeper python 操作 zk 快速入门 zk节点 创建节点 查询节点 ...

  5. html css导航栏字体图标,HTML+CSS入门之两种图标字体库

    本篇教程介绍了HTML+CSS入门之两种图标字体库,希望阅读本篇文章以后大家有所收获,帮助大家HTML+CSS入门. < ## 0. 前言 比较基础的图标加载:和块元素的背景background ...

  6. 完全理解乘法快速幂及其两种写法的解析

    an=?0≤n≤10105an=?0≤n≤10105 a^n=? \qquad 0 \le n \le 10^{10^5} 没错,乘法快速幂就是解决上述问题的. 乘法快速幂的思想 可以看到,要求一个数 ...

  7. win8.1系统快速关机的两种方法

    win8.1系统快速关机的两种方法 前几天刚刚重装了一遍系统,觉得以前的win8.1的系统用起来还挺顺手的,这次也就装8.1的吧,装完后一切都好,但是晚上关机的时候发现一点跟我上次系统不一样的地方,就 ...

  8. 我们该使用哪种分布式锁?

    作者:孤独烟 || 责任编辑:帝都羊 来自:cnblogs.com/rjzheng/p/9310976.html 0 题记 为什么写这篇文章? 目前网上大部分的基于zookpeer,和redis的分布 ...

  9. 还不会使用分布式锁?教你三种分布式锁实现的方式

    摘要:在单进程的系统中,当存在多个线程可以同时改变某个变量时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量,而同步本质上通过锁来实现. 本文分享自华为云社区<还不 ...

最新文章

  1. mysql 树形结构_再读MySQL索引-《高性能MySQL》索引手记
  2. BNUOJ34980方(芳)格(哥)取数(好坑)
  3. mysql 取二进制某一位_c语言中如何提取二进制数中的某一位?
  4. 所有字符不含换行 正则表达式_网络爬虫 | 正则表达式
  5. ORACLE 新建数据库及权限赋予
  6. ASCII码对照表 转帖
  7. 大数据智能分析的特征和能力
  8. SAMBA最简单的配置方法
  9. 使用coin3d画个小模型
  10. 【翻译】AdaIN:Arbitrary Style Transfer in Real-time with Adaptive Instance Normalization
  11. Android开发——回调函数实例
  12. 备忘录形成html乱码,浏览器icloud网页版备忘录乱码不能显示中文汉字-企业网站设计之中的字体坑...
  13. 让 QQ 邮箱更好用,支持桌面通知
  14. 5月第2周业务风控关注 | 央行:严禁未经授权认可的APP接入征信系统
  15. 支付宝电脑网页支付接口的调用
  16. Charles使用及常用功能介绍
  17. Windows10重装设置(个人)
  18. 电脑启动后屏幕会间歇闪烁类似刷新
  19. java模拟电梯程序_Java编写的电梯模拟系统《结对作业》
  20. android内核网络缓存,Android WebView cache 缓存 腾讯X5内核在线视频播放

热门文章

  1. ngOnChanges的简单举例概述
  2. 计算机考试报名联系邮编填什么,【职称计算机考试报名常见问题答疑】- 环球网校...
  3. C语言编程——输入三个整数x,y,z,请把这三个数由小到大输出
  4. Linux显存占用无进程清理方法
  5. MegaUpload + MegaRotic Mega Manager 1.0.1.5
  6. 局域网中无法访问的解决方法
  7. 《精益创业》读后思考 2
  8. 旋转矩阵缩水专家 bt
  9. 人人视频转型海外短视频内容社区
  10. 阿里为什么能抗住双 11 ?看完这篇你就明白了!