Zookeeper
Zookeeper
概述
Zookeeper 是一个开源的分布式的,为分布式框架提供协调服务的 Apache 项目。
Zookeeper = 文件系统 + 通知机制
特点
- Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
- 集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。
- 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
- 更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
- 数据更新原子性,一次数据更新要么成功,要么失败。
- 实时性,在一定时间范围内,Client能读到最新数据。
数据结构
ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个 节点称做一个 ZNode。每一个 ZNode 默认能够存
储 1MB
的数据,每个 ZNode 都可以通过其路径唯一标识。
应用场景
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
统一命名服务
分布式环境下,为多台服务器统一命名。
统一配置管理
分布式环境下,配置文件同步非常常见。一般要求一个集群中,所有节点的配置信息是 一致的,比如 Kafka 集群。对配置文件修改后,希望能够快速同步到各个节点上。可将配置信息写入 ZooKeeper 上的一个Znode。一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。
统一集群管理
分布式环境中,实时掌握每个节点的状态是必要的。ZooKeeper 可以实现实时监控节点状态变化。
服务器动态上下线
客户端能实时洞察到服务器上下线的变化
软负载均衡
在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求
下载
本地安装
JDK
1
yum -y install java-1.8.0-openjdk-devel.x86_64
拷贝 apache-zookeeper-3.5.7-bin.tar.gz 安装包到 Linux 系统下
解压到指定目录
1
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
修改名称
1
mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
修改配置
将
/opt/module/zookeeper-3.5.7/conf
这个路径下的zoo_sample.cfg
修改为zoo.cfg
1
mv zoo_sample.cfg zoo.cfg
打开
zoo.cfg
文件,修改dataDir
路径1
dataDir=/opt/module/zookeeper-3.5.7/zkData
在
/opt/module/zookeeper-3.5.7/
这个目录上创建 zkData 文件夹1
mkdir zkData
操作Zookeeper
启动 Zookeeper
1
[root@instance-jwzbgijw zookeeper-3.5.7]# bin/zkServer.sh start
查看进程是否启动
1
2
3[root@instance-jwzbgijw zookeeper-3.5.7]# jps
3253 Jps
2680 QuorumPeerMain查看状态
1
2
3
4
5
6[root@instance-jwzbgijw zookeeper-3.5.7]# bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone启动客户端
1
[root@instance-jwzbgijw zookeeper-3.5.7]# bin/zkCli.sh
退出客户端
1
[zk: localhost:2181(CONNECTED) 0] quit
停止 Zookeeper
1
[root@instance-jwzbgijw zookeeper-3.5.7]# bin/zkServer.sh stop
配置参数
Zookeeper 中的配置文件 zoo.cfg 中参数含义解读
tickTime = 2000:通信心跳时间,Zookeeper 服务器与客户端心跳时间,单位毫秒
initLimit = 10:LF 初始通信时限
Leader 和 Follower 初始连接时能容忍的最多心跳数(tickTime的数量)
syncLimit = 5:LF 同步通信时限
Leader 和 Follower 之间通信时间如果超过 syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除 Follwer。
dataDir:保存Zookeeper中的数据
默认的 tmp 目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。
clientPort = 2181:客户端连接端口,通常不做修改
集群
集群安装
规划
在 hadoop101、hadoop102 和 hadoop103 三个节点上都部署 Zookeeper。
解压安装
在 hadoop101 解压 Zookeeper 安装包到
/opt/module/
目录下1
[hadoop@hadoop101 software]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
修改 apache-zookeeper-3.5.7-bin 名称为 zookeeper-3.5.7
1
[hadoop@hadoop101 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
配置服务器编号
在
/opt/module/zookeeper-3.5.7/
目录下创建zkData
文件夹1
[hadoop@hadoop101 zookeeper-3.5.7]$ mkdir zkData
在
/opt/module/zookeeper-3.5.7/zkData
目录下创建myid
的文件1
[hadoop@hadoop101 zkData]$ vi myid
在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)
1
1
分发配置好的 zookeeper 到其他机器上,并分别在 hadoop102、hadoop103 上修改 myid 文件中内容为 2、3
1
[hadoop@hadoop101 module]$ xsync zookeeper-3.5.7
配置 zoo.cfg 文件
重命名
/opt/module/zookeeper-3.5.7/conf
这个目录下的zoo_sample.cfg
为zoo.cfg
1
[hadoop@hadoop101 conf]$ mv zoo_sample.cfg zoo.cfg
修改 zoo.cfg 文件
1
dataDir=/opt/module/zookeeper-3.5.7/zkData
增加如下配置
1
2
3
4#######################cluster##########################
server.1=hadoop101:2888:3888
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888server.A=B:C:D
:A
:表示第几号服务器,集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据 就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比 较从而判断到底是哪个 server;
B
:这个服务器的地址;C
:这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;D
:万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
同步 zoo.cfg 配置文件
1
[hadoop@hadoop101 conf]$ xsync zoo.cfg
操作
分别启动 Zookeeper
1
2
3[hadoop@hadoop101 zookeeper-3.5.7]$ bin/zkServer.sh start
[hadoop@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start
[hadoop@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh start查看状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17[hadoop@hadoop101 zookeeper-3.5.7]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
[hadoop@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
[hadoop@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
选举机制
第一次启动
- 服务器1启 动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为 LOOKING;
- 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
- 服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;
- 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为 1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
- 服务器5启动,同4一样当小弟。
非第一次启动
当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
- 服务器初始化启动
- 服务器运行期间无法和Leader保持连接
而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
集群中本来就已经存在一个Leader。 对于已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
集群中确实不存在Leader
假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻, 3和5服务器出现故障,因此开始进行Leader选举。
选举规则:1、EPOCH大的直接胜出;2、②EPOCH相同,事务id大的胜出;3、事务id相同,服务器id大的胜出。
SID:
服务器ID。用来唯一标识一台 ZooKeeper 集群中的机器,每台机器不能重复,和myid一致。ZXID:
事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻, 集群中的每台机器的ZXID值不一定完全一 致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑有关。Epoch:
每个Leader任期的代号。没有 Leader 时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加
群起群停
在 hadoop101 的
/home/hadoop/bin
目录下创建脚本1
[hadoop@hadoop101 bin]$ vim zk.sh
脚本内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24!/bin/bash
case $1 in
"start"){
for i in hadoop101 hadoop102 hadoop103
do
echo ---------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop101 hadoop102 hadoop103
do
echo ---------- zookeeper $i 停止 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop101 hadoop102 hadoop103
do
echo ---------- zookeeper $i 状态 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
};;
esac增加脚本执行权限
1
[hadoop@hadoop101 bin]$ chmod u+x zk.sh
Zookeeper 集群群起
1
[hadoop@hadoop101 ~]$ zk.sh start
Zookeeper 集群群停
1
[hadoop@hadoop101 ~]$ zk.sh stop
查看集群状态
1
[hadoop@hadoop101 ~]$ zk.sh status
在任意目录下都可以直接执行 zk.sh
命令,同 xsync
客户端
命令行
常用命令,具体 help 查看
命令 | 解释 |
---|---|
help | 显示所有操作命令 |
ls path | 使用 ls 命令来查看当前 znode 的子节点 [可监听] -w 监听子节点变化 -s 附加次级信息 |
create | 普通创建 -s 含有序列 -e 临时(重启或者超时消失) |
get path | 获得节点的值 [可监听] -w 监听节点内容变化 -s 附加次级信息 |
set | 设置节点的数据值 |
stat | 查看节点状态 |
delete | 删除节点 |
deleteall | 递归删除节点 |
节点信息
启动客户端
1
[hadoop@hadoop101 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop101:2181
查看当前 znode 中所包含的内容
查看当前节点详细数据
czxid
:创建节点的事务 zxid每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。
ctime
:znode 被创建的毫秒数(从 1970 年开始)mzxid
:znode 最后更新的事务 zxidmtime
:znode 最后修改的毫秒数(从 1970 年开始)pZxid
:znode 最后更新的子节点 zxidcversion
:znode 子节点变化号,znode 子节点修改次数dataversion
:znode 数据变化号aclVersion
:znode 访问控制列表的变化号ephemeralOwner
:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点为 0dataLength
:znode 的数据长度numChildren
:znode 子节点数量
节点类型
概述
- 持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
- 短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
它们各自又可以分为
有序号
和无序号
节点
说明:创建 znode 时设置顺序标识,在 znode 名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
注意:在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。
从左至右:
- 持久化目录节点
- 持久化顺序编号目录节点
- 临时目录节点
- 临时顺序编号目录节点
实例
引号内容为数据值
创建永久节点(不带序号)
获得节点信息
创建永久节点(带序号)
先创建一个普通的根节点
创建带序号的永久节点
如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。
创建临时节点(不带序号)
创建临时节点(带序号)
节点操作
列举几个
删除节点
1
delete /test1/test66
递归删除节点
将节点 test2 及以下节点删除
1
deleteall /test1/test2
查看节点状态
1
stat /test1
监听器
原理
- 首先要有一个main()线程;
- 在 main 线程中创建 Zookeeper 客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener);
- 通过 connect 线程将注册的监听事件发送给Zookeeper;
- 在 Zookeeper 的注册监听器列表中将注册的监听事件添加到列表中;
- Zookeeper 监听到有数据或路径变化,就会将这个消息发送给 listener 线程
- listener 线程内部调用 process() 方法
实例
常见监听:
- 监听节点数据的变化 get path [watch]
- 监听子节点增减的变化 ls path [watch]
节点值变化监听
1)在 hadoop103 主机上注册监听
/test1
节点数据变化1
[zk: hadoop103:2181(CONNECTED) 1] get -w /test1
2)在 hadoop102 主机上修改
/test1
节点的数据1
[zk: hadoop102:2181(CONNECTED) 1] set /test1 "xiugai"
3)观察 hadoop103 主机收到数据变化的监听
注意:如果再修改
/test1
节点的值,hadoop103 不会再收到监听。因为注册 一次,只能监听一次。想再次监听,需要再次注册。节点的子节点变化监听(路径变化)
1)在 hadoop103 注册监听
/test1
节点的子节点变化1
2[zk: hadoop103:2181(CONNECTED) 0] ls -w /test1
[test2, test3]2)在 hadoop102 主机
/test1
节点上创建子节点1
2[zk: hadoop102:2181(CONNECTED) 2] create /test1/test66 "666"
Created /test1/test663)观察 hadoop103 主机收到子节点变化的监听
注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。
API
即在 idea 中操作,前提集群启动着。
创建一个 maven 工程
引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>配置
log4j.properties
在项目
src/main/resources
目录下,新建log4j.properties
文件,在文件中填入如下内容:1
2
3
4
5
6
7
8log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51public class zkClient {
// 本机如果没有配置映射,hadoop101等得改成对应ip地址
private String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
// 超时时间 2s
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
public void init() throws IOException, InterruptedException, KeeperException {
// 创建客户端连接
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
// 监听逻辑
public void process(WatchedEvent watchedEvent) {
List<String> children = null;
try {
// 运行程序,当节点变化会重新获取节点
children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
System.out.println("-------------------");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public void getChildren() throws Exception {
// 创建子节点
// 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
zkClient.create("/newNode", "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 获取子节点 并 监听节点变化(true监听)只执行一次
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
System.out.println("-------------------");
// 延时阻塞(不让程序那么快结束,为了继续监听节点)
Thread.sleep(Long.MAX_VALUE);
}
}之后只要阻塞还没结束,就会继续监听
其它操作
等等
1
2// 判断节点是否存在
Stat stat = zkClient.exists("/test1", false);
写数据原理
即客户端向服务端写数据的流程
写流程之写入请求发送给 Leader 节点
写流程之写入请求发送给 Follower 节点
服务器动态上下线
分析
实现
在集群创建
/servers
节点1
[zk: hadoop101:2181(CONNECTED) 7] create /servers "servers"
服务器端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public class DistributeServer {
private static String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zkClient = null;
private String parentNode = "/servers";
// 创建到 zk 的客户端连接
public void getConnect() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
}
// 注册服务器
public void registerServer(String hostname) throws Exception {
String create = zkClient.create(parentNode + "/server",
hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname +" is online "+ create);
}
// 业务功能
public void business(String hostname) throws Exception{
System.out.println(hostname + " is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception{
DistributeServer server = new DistributeServer();
// 1 获取zk连接
server.getConnect();
// 2 注册服务器
server.registerServer(args[0]);
// 3 业务
server.business(args[0]);
}
}客户端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54public class DistributeClient {
private static String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zkClient = null;
private String parentNode = "/servers";
// 创建到 zk 的客户端连接
public void getConnect() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
// 再次启动监听(因为zk注册一次监听只能监听一次)
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
// 获取服务器列表信息
public void getServerList() throws Exception {
// 1 获取服务器子节点信息,并且对父节点进行监听
List<String> children = zkClient.getChildren(parentNode, true);
// 2 存储服务器信息列表
ArrayList<String> servers = new ArrayList<>();
// 3 遍历所有节点,获取节点中的主机名称信息
for (String child : children) {
byte[] data = zkClient.getData(parentNode + "/" + child,
false, null);
servers.add(new String(data));
}
// 4 打印服务器列表信息
System.out.println(servers);
}
// 业务功能
public void business() throws Exception{
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
DistributeClient client = new DistributeClient();
// 1 获取 zk 连接
client.getConnect();
// 2 业务进程启动
client.business();
}
}
分布式锁
概述
比如说某进程在使用资源的时候,会先去获得锁,获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,用完资源以后就
将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们
把这个分布式环境下的这个锁叫作分布式锁。
分析
实现
原生实现
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99public class DistributedLock {
private String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
private String rootNode = "locks";
private String subNode = "seq-";
// 当前 client 等待的子节点
private String waitPath;
// 连接等待
private CountDownLatch connectLatch = new CountDownLatch(1);
// 节点删除等待
private CountDownLatch waitLatch = new CountDownLatch(1);
// 当前 client 创建的子节点
private String currentNode;
// 和 zk 服务建立连接,并创建根节点
public DistributedLock() throws Exception {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
// 连接建立时, 唤醒 connectLatch 线程
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 发生了 waitPath 的删除事件,唤醒 waitLatch 线程
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath))
{
waitLatch.countDown();
}
}
});
// 等待连接建立
connectLatch.await();
//获取根节点状态
Stat stat = zk.exists("/" + rootNode, false);
//如果根节点不存在,则创建根节点,根节点类型为永久节点
if (stat == null) {
System.out.println("根节点不存在");
zk.create("/" + rootNode, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 加锁方法
public void zkLock() {
try {
//在根节点下创建临时顺序节点,返回值为创建的节点路径
currentNode = zk.create("/" + rootNode + "/" + subNode,
null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait 一小会, 让结果更清晰一些
Thread.sleep(10);
// 获取 /locks 下所有节点
List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
// 列表中只有一个子节点, 那肯定就是 currentNode , 说明 client 获得锁
if (childrenNodes.size() == 1) {
return;
} else {
//对根节点下的所有临时顺序节点进行从小到大排序
Collections.sort(childrenNodes);
//当前节点名称
String thisNode = currentNode.substring(("/" + rootNode + "/").length());
//获取当前节点的位置
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// index == 0, 说明 thisNode 在列表中最小, 当前 client 获得锁
return;
} else {
// 获得排名比 currentNode 前 1 位的节点
this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
// 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法
zk.getData(waitPath, true, new Stat());
//进入等待锁状态
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 解锁方法
public void zkUnlock() {
try {
zk.delete(this.currentNode, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42public class DistributedLockTest {
public static void main(String[] args) throws Exception {
// 创建分布式锁 1
final DistributedLock lock1 = new DistributedLock();
// 创建分布式锁 2
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
public void run() {
// 获取锁对象
try {
lock1.zkLock();
System.out.println("线程 1 获取锁");
Thread.sleep(5 * 1000);
lock1.zkUnlock();
System.out.println("线程 1 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
// 获取锁对象
try {
lock2.zkLock();
System.out.println("线程 2 获取锁");
Thread.sleep(5 * 1000);
lock2.zkUnlock();
System.out.println("线程 2 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}结果
1
2
3
4线程 1 获取锁
线程 1 释放锁
线程 2 获取锁
线程 2 释放锁
Curator框架实现
原生的 Java API 开发存在的问题:
- 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
- Watch 需要重复注册,不然就不能生效
- 开发的复杂性还是比较高的
- 不支持多节点删除和创建。需要自己去递归
Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。
详情请查看官方文档
Curator 案例实操:
添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81public class CuratorLockTest {
private String rootNode = "/locks";
private String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
private int connectionTimeout = 2000;
private int sessionTimeout = 2000;
public static void main(String[] args) {
new CuratorLockTest().test();
}
// 测试
private void test(){
// 创建分布式锁 1
final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
// 创建分布式锁 2
final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
new Thread(new Runnable() {
public void run() {
// 获取锁对象
try {
lock1.acquire();
System.out.println("线程 1 获取锁");
// 测试锁重入
lock1.acquire();
System.out.println("线程 1 再次获取锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("线程 1 释放锁");
lock1.release();
System.out.println("线程 1 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
// 获取锁对象
try {
lock2.acquire();
System.out.println("线程 2 获取锁");
// 测试锁重入
lock2.acquire();
System.out.println("线程 2 再次获取锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("线程 2 释放锁");
lock2.release();
System.out.println("线程 2 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
// 分布式锁初始化
public CuratorFramework getCuratorFramework () {
//重试策略,初试时间 3 秒,重试 3 次
RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
//通过工厂创建 Curator
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectString)
.connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(policy).build();
//开启连接
client.start();
System.out.println("zookeeper 初始化完成...");
return client;
}
}效果
1
2
3
4
5
6
7
8线程 1 获取锁
线程 1 再次获取锁
线程 1 释放锁
线程 1 再次释放锁
线程 2 获取锁
线程 2 再次获取锁
线程 2 释放锁
线程 2 再次释放锁