Zookeeper

概述

Zookeeper 是一个开源的分布式的,为分布式框架提供协调服务的 Apache 项目。

Zookeeper = 文件系统 + 通知机制

特点

image-20220726162301441

  • Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
  • 集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。
  • 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
  • 更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
  • 数据更新原子性,一次数据更新要么成功,要么失败。
  • 实时性,在一定时间范围内,Client能读到最新数据。

数据结构

ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个 节点称做一个 ZNode。每一个 ZNode 默认能够存

1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。

image-20220726163557949

应用场景

提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

  • 统一命名服务

    分布式环境下,为多台服务器统一命名。

    image-20220726165133524

  • 统一配置管理

    分布式环境下,配置文件同步非常常见。一般要求一个集群中,所有节点的配置信息是 一致的,比如 Kafka 集群。对配置文件修改后,希望能够快速同步到各个节点上。可将配置信息写入 ZooKeeper 上的一个Znode。一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。

    image-20220726171513666

  • 统一集群管理

    分布式环境中,实时掌握每个节点的状态是必要的。ZooKeeper 可以实现实时监控节点状态变化。

    image-20220727084325200

  • 服务器动态上下线

    客户端能实时洞察到服务器上下线的变化

    image-20220727084608910

  • 软负载均衡

    在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求

    image-20220727084725522

下载

官网地址

image-20220727085219974

image-20220727085316904

image-20220727085356387

image-20220727085610372

本地安装

  1. JDK

    1
    yum -y install java-1.8.0-openjdk-devel.x86_64
  2. 拷贝 apache-zookeeper-3.5.7-bin.tar.gz 安装包到 Linux 系统下

  3. 解压到指定目录

    1
    tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
  4. 修改名称

    1
    mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
  5. 修改配置

    • /opt/module/zookeeper-3.5.7/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg

      1
      mv zoo_sample.cfg zoo.cfg
    • 打开 zoo.cfg 文件,修改 dataDir 路径

      1
      dataDir=/opt/module/zookeeper-3.5.7/zkData
    • /opt/module/zookeeper-3.5.7/这个目录上创建 zkData 文件夹

      1
      mkdir zkData
  6. 操作Zookeeper

    • 启动 Zookeeper

      1
      [root@instance-jwzbgijw zookeeper-3.5.7]# bin/zkServer.sh start
    • 查看进程是否启动

      1
      2
      3
      [root@instance-jwzbgijw zookeeper-3.5.7]# jps
      3253 Jps
      2680 QuorumPeerMain
    • 查看状态

      1
      2
      3
      4
      5
      6
      [root@instance-jwzbgijw zookeeper-3.5.7]# bin/zkServer.sh status
      /usr/bin/java
      ZooKeeper JMX enabled by default
      Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
      Client port found: 2181. Client address: localhost.
      Mode: standalone
    • 启动客户端

      1
      [root@instance-jwzbgijw zookeeper-3.5.7]# bin/zkCli.sh
    • 退出客户端

      1
      [zk: localhost:2181(CONNECTED) 0] quit
    • 停止 Zookeeper

      1
      [root@instance-jwzbgijw zookeeper-3.5.7]#  bin/zkServer.sh stop

配置参数

Zookeeper 中的配置文件 zoo.cfg 中参数含义解读

  • tickTime = 2000:通信心跳时间,Zookeeper 服务器与客户端心跳时间,单位毫秒

  • initLimit = 10:LF 初始通信时限

    Leader 和 Follower 初始连接时能容忍的最多心跳数(tickTime的数量)

  • syncLimit = 5:LF 同步通信时限

    Leader 和 Follower 之间通信时间如果超过 syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除 Follwer。

  • dataDir:保存Zookeeper中的数据

    默认的 tmp 目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。

  • clientPort = 2181:客户端连接端口,通常不做修改

集群

集群安装

  1. 规划

    在 hadoop101、hadoop102 和 hadoop103 三个节点上都部署 Zookeeper。

  2. 解压安装

    在 hadoop101 解压 Zookeeper 安装包到 /opt/module/ 目录下

    1
    [hadoop@hadoop101 software]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/

    修改 apache-zookeeper-3.5.7-bin 名称为 zookeeper-3.5.7

    1
    [hadoop@hadoop101 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
  3. 配置服务器编号

    /opt/module/zookeeper-3.5.7/目录下创建 zkData文件夹

    1
    [hadoop@hadoop101 zookeeper-3.5.7]$ mkdir zkData

    /opt/module/zookeeper-3.5.7/zkData目录下创建 myid 的文件

    1
    [hadoop@hadoop101 zkData]$ vi myid

    在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)

    1
    1

    分发配置好的 zookeeper 到其他机器上,并分别在 hadoop102、hadoop103 上修改 myid 文件中内容为 2、3

    1
    [hadoop@hadoop101 module]$ xsync zookeeper-3.5.7
  4. 配置 zoo.cfg 文件

    重命名/opt/module/zookeeper-3.5.7/conf 这个目录下的 zoo_sample.cfgzoo.cfg

    1
    [hadoop@hadoop101 conf]$ mv zoo_sample.cfg zoo.cfg

    修改 zoo.cfg 文件

    1
    dataDir=/opt/module/zookeeper-3.5.7/zkData

    增加如下配置

    1
    2
    3
    4
    #######################cluster##########################
    server.1=hadoop101:2888:3888
    server.2=hadoop102:2888:3888
    server.3=hadoop103:2888:3888

    server.A=B:C:D

    A :表示第几号服务器,集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据 就是 A 的值,

    Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比 较从而判断到底是哪个 server;

    B:这个服务器的地址;

    C:这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;

    D:万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服

    务器相互通信的端口。

    同步 zoo.cfg 配置文件

    1
    [hadoop@hadoop101 conf]$ xsync zoo.cfg
  5. 操作

    分别启动 Zookeeper

    1
    2
    3
    [hadoop@hadoop101 zookeeper-3.5.7]$ bin/zkServer.sh start
    [hadoop@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start
    [hadoop@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh start

    查看状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    [hadoop@hadoop101 zookeeper-3.5.7]$ bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost.
    Mode: follower

    [hadoop@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost.
    Mode: leader

    [hadoop@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost.
    Mode: follower

选举机制

第一次启动

image-20220728120356182

  1. 服务器1启 动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为 LOOKING;
  2. 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
  3. 服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;
  4. 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为 1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
  5. 服务器5启动,同4一样当小弟。

非第一次启动

image-20220728120411537

  1. 当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:

    • 服务器初始化启动
    • 服务器运行期间无法和Leader保持连接
  2. 而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:

    • 集群中本来就已经存在一个Leader。 对于已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。

    • 集群中确实不存在Leader

      假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻, 3和5服务器出现故障,因此开始进行Leader选举。

      选举规则:1、EPOCH大的直接胜出;2、②EPOCH相同,事务id大的胜出;3、事务id相同,服务器id大的胜出。

      SID:服务器ID。用来唯一标识一台 ZooKeeper 集群中的机器,每台机器不能重复,和myid一致。

      ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻, 集群中的每台机器的ZXID值不一定完全一 致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑有关。

      Epoch:每个Leader任期的代号。没有 Leader 时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加

群起群停

  1. 在 hadoop101 的 /home/hadoop/bin 目录下创建脚本

    1
    [hadoop@hadoop101 bin]$ vim zk.sh

    脚本内容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    #!/bin/bash
    case $1 in
    "start"){
    for i in hadoop101 hadoop102 hadoop103
    do
    echo ---------- zookeeper $i 启动 ------------
    ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
    done
    };;
    "stop"){
    for i in hadoop101 hadoop102 hadoop103
    do
    echo ---------- zookeeper $i 停止 ------------
    ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
    done
    };;
    "status"){
    for i in hadoop101 hadoop102 hadoop103
    do
    echo ---------- zookeeper $i 状态 ------------
    ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
    done
    };;
    esac
  2. 增加脚本执行权限

    1
    [hadoop@hadoop101 bin]$ chmod u+x zk.sh
  3. Zookeeper 集群群起

    1
    [hadoop@hadoop101 ~]$ zk.sh start
  4. Zookeeper 集群群停

    1
    [hadoop@hadoop101 ~]$ zk.sh stop
  5. 查看集群状态

    1
    [hadoop@hadoop101 ~]$ zk.sh status

在任意目录下都可以直接执行 zk.sh 命令,同 xsync

客户端

命令行

常用命令,具体 help 查看

命令 解释
help 显示所有操作命令
ls path 使用 ls 命令来查看当前 znode 的子节点 [可监听]
-w 监听子节点变化
-s 附加次级信息
create 普通创建
-s 含有序列
-e 临时(重启或者超时消失)
get path 获得节点的值 [可监听]
-w 监听节点内容变化
-s 附加次级信息
set 设置节点的数据值
stat 查看节点状态
delete 删除节点
deleteall 递归删除节点

节点信息

  1. 启动客户端

    1
    [hadoop@hadoop101 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop101:2181
  2. 查看当前 znode 中所包含的内容

    image-20220728124600762

  3. 查看当前节点详细数据

    image-20220728124522766

    • czxid:创建节点的事务 zxid

      每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。

    • ctime:znode 被创建的毫秒数(从 1970 年开始)

    • mzxid:znode 最后更新的事务 zxid

    • mtime:znode 最后修改的毫秒数(从 1970 年开始)

    • pZxid:znode 最后更新的子节点 zxid

    • cversion:znode 子节点变化号,znode 子节点修改次数

    • dataversion:znode 数据变化号

    • aclVersion:znode 访问控制列表的变化号

    • ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点为 0

    • dataLength:znode 的数据长度

    • numChildren:znode 子节点数量

节点类型

概述

  • 持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
  • 短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除

它们各自又可以分为有序号无序号节点

说明:创建 znode 时设置顺序标识,在 znode 名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。

注意:在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。

image-20220728134544554

从左至右:

  • 持久化目录节点
  • 持久化顺序编号目录节点
  • 临时目录节点
  • 临时顺序编号目录节点

实例

引号内容为数据值

  • 创建永久节点(不带序号)

    image-20220728135439433

    获得节点信息

    image-20220728135556426

  • 创建永久节点(带序号)

    先创建一个普通的根节点

    image-20220728142411615

    创建带序号的永久节点

    image-20220728142424518

    如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。

  • 创建临时节点(不带序号)

    image-20220728143553688

  • 创建临时节点(带序号)

    image-20220728143654925

节点操作

列举几个

  • 删除节点

    1
    delete /test1/test66
  • 递归删除节点

    将节点 test2 及以下节点删除

    1
    deleteall /test1/test2
  • 查看节点状态

    1
    stat /test1

    image-20220728190046367

监听器

原理

image-20220728144437458

  1. 首先要有一个main()线程;
  2. 在 main 线程中创建 Zookeeper 客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener);
  3. 通过 connect 线程将注册的监听事件发送给Zookeeper;
  4. 在 Zookeeper 的注册监听器列表中将注册的监听事件添加到列表中;
  5. Zookeeper 监听到有数据或路径变化,就会将这个消息发送给 listener 线程
  6. listener 线程内部调用 process() 方法

实例

常见监听:

  • 监听节点数据的变化 get path [watch]
  • 监听子节点增减的变化 ls path [watch]
  • 节点值变化监听

    1)在 hadoop103 主机上注册监听 /test1 节点数据变化

    1
    [zk: hadoop103:2181(CONNECTED) 1]  get -w /test1

    2)在 hadoop102 主机上修改 /test1 节点的数据

    1
    [zk: hadoop102:2181(CONNECTED) 1] set /test1 "xiugai"

    3)观察 hadoop103 主机收到数据变化的监听

    image-20220728184645921

    注意:如果再修改/test1节点的值,hadoop103 不会再收到监听。因为注册 一次,只能监听一次。想再次监听,需要再次注册。

  • 节点的子节点变化监听(路径变化)

    1)在 hadoop103 注册监听 /test1节点的子节点变化

    1
    2
    [zk: hadoop103:2181(CONNECTED) 0]  ls -w /test1
    [test2, test3]

    2)在 hadoop102 主机 /test1节点上创建子节点

    1
    2
    [zk: hadoop102:2181(CONNECTED) 2] create /test1/test66 "666"
    Created /test1/test66

    3)观察 hadoop103 主机收到子节点变化的监听

    image-20220728185420038

    注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。

API

即在 idea 中操作,前提集群启动着。

  1. 创建一个 maven 工程

  2. 引入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    <dependencies>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>RELEASE</version>
    </dependency>
    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.7</version>
    </dependency>
    </dependencies>
  3. 配置 log4j.properties

    在项目 src/main/resources 目录下,新建 log4j.properties 文件,在文件中填入如下内容:

    1
    2
    3
    4
    5
    6
    7
    8
    log4j.rootLogger=INFO, stdout 
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  4. 测试类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    public class zkClient {

    // 本机如果没有配置映射,hadoop101等得改成对应ip地址
    private String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
    // 超时时间 2s
    private int sessionTimeout = 2000;

    private ZooKeeper zkClient;

    @Before
    public void init() throws IOException, InterruptedException, KeeperException {

    // 创建客户端连接
    zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    // 监听逻辑
    @Override
    public void process(WatchedEvent watchedEvent) {
    List<String> children = null;
    try {
    // 运行程序,当节点变化会重新获取节点
    children = zkClient.getChildren("/", true);
    for (String child : children) {
    System.out.println(child);
    }
    System.out.println("-------------------");
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    });

    }

    @Test
    public void getChildren() throws Exception {
    // 创建子节点
    // 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
    zkClient.create("/newNode", "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    // 获取子节点 并 监听节点变化(true监听)只执行一次
    List<String> children = zkClient.getChildren("/", true);
    for (String child : children) {
    System.out.println(child);
    }
    System.out.println("-------------------");
    // 延时阻塞(不让程序那么快结束,为了继续监听节点)
    Thread.sleep(Long.MAX_VALUE);

    }

    }

    之后只要阻塞还没结束,就会继续监听

    image-20220728200402005

  5. 其它操作

    等等

    1
    2
    // 判断节点是否存在
    Stat stat = zkClient.exists("/test1", false);

写数据原理

即客户端向服务端写数据的流程

  • 写流程之写入请求发送给 Leader 节点

    image-20220728201144425

  • 写流程之写入请求发送给 Follower 节点

    image-20220728201227835

服务器动态上下线

分析

image-20220727084608910

实现

  1. 在集群创建 /servers 节点

    1
    [zk: hadoop101:2181(CONNECTED) 7] create /servers "servers"
  2. 服务器端代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    public class DistributeServer {

    private static String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient = null;
    private String parentNode = "/servers";


    // 创建到 zk 的客户端连接
    public void getConnect() throws Exception {

    zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent watchedEvent) {
    }
    });
    }

    // 注册服务器
    public void registerServer(String hostname) throws Exception {
    String create = zkClient.create(parentNode + "/server",
    hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println(hostname +" is online "+ create);
    }

    // 业务功能
    public void business(String hostname) throws Exception{
    System.out.println(hostname + " is working ...");
    Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception{

    DistributeServer server = new DistributeServer();

    // 1 获取zk连接
    server.getConnect();
    // 2 注册服务器
    server.registerServer(args[0]);
    // 3 业务
    server.business(args[0]);
    }
    }
  3. 客户端代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    public class DistributeClient {

    private static String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient = null;
    private String parentNode = "/servers";

    // 创建到 zk 的客户端连接
    public void getConnect() throws Exception {

    zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent watchedEvent) {
    // 再次启动监听(因为zk注册一次监听只能监听一次)
    try {
    getServerList();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    });
    }

    // 获取服务器列表信息
    public void getServerList() throws Exception {
    // 1 获取服务器子节点信息,并且对父节点进行监听
    List<String> children = zkClient.getChildren(parentNode, true);
    // 2 存储服务器信息列表
    ArrayList<String> servers = new ArrayList<>();
    // 3 遍历所有节点,获取节点中的主机名称信息
    for (String child : children) {
    byte[] data = zkClient.getData(parentNode + "/" + child,
    false, null);
    servers.add(new String(data));
    }
    // 4 打印服务器列表信息
    System.out.println(servers);
    }

    // 业务功能
    public void business() throws Exception{
    System.out.println("client is working ...");
    Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
    DistributeClient client = new DistributeClient();

    // 1 获取 zk 连接
    client.getConnect();
    // 2 业务进程启动
    client.business();
    }
    }

分布式锁

概述

​ 比如说某进程在使用资源的时候,会先去获得锁,获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,用完资源以后就

将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们

把这个分布式环境下的这个锁叫作分布式锁。

分析

image-20220729115849971

实现

原生实现

  • 代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    public class DistributedLock {

    private String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zk;
    private String rootNode = "locks";
    private String subNode = "seq-";
    // 当前 client 等待的子节点
    private String waitPath;
    // 连接等待
    private CountDownLatch connectLatch = new CountDownLatch(1);
    // 节点删除等待
    private CountDownLatch waitLatch = new CountDownLatch(1);
    // 当前 client 创建的子节点
    private String currentNode;


    // 和 zk 服务建立连接,并创建根节点
    public DistributedLock() throws Exception {
    zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    // 连接建立时, 唤醒 connectLatch 线程
    if (event.getState() == Event.KeeperState.SyncConnected) {
    connectLatch.countDown();
    }
    // 发生了 waitPath 的删除事件,唤醒 waitLatch 线程
    if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath))
    {
    waitLatch.countDown();
    }
    }
    });
    // 等待连接建立
    connectLatch.await();
    //获取根节点状态
    Stat stat = zk.exists("/" + rootNode, false);
    //如果根节点不存在,则创建根节点,根节点类型为永久节点
    if (stat == null) {
    System.out.println("根节点不存在");
    zk.create("/" + rootNode, new byte[0],
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    }


    // 加锁方法
    public void zkLock() {
    try {
    //在根节点下创建临时顺序节点,返回值为创建的节点路径
    currentNode = zk.create("/" + rootNode + "/" + subNode,
    null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL_SEQUENTIAL);
    // wait 一小会, 让结果更清晰一些
    Thread.sleep(10);
    // 获取 /locks 下所有节点
    List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
    // 列表中只有一个子节点, 那肯定就是 currentNode , 说明 client 获得锁
    if (childrenNodes.size() == 1) {
    return;
    } else {
    //对根节点下的所有临时顺序节点进行从小到大排序
    Collections.sort(childrenNodes);
    //当前节点名称
    String thisNode = currentNode.substring(("/" + rootNode + "/").length());
    //获取当前节点的位置
    int index = childrenNodes.indexOf(thisNode);
    if (index == -1) {
    System.out.println("数据异常");
    } else if (index == 0) {
    // index == 0, 说明 thisNode 在列表中最小, 当前 client 获得锁
    return;
    } else {
    // 获得排名比 currentNode 前 1 位的节点
    this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
    // 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法
    zk.getData(waitPath, true, new Stat());
    //进入等待锁状态
    waitLatch.await();
    return;
    }
    }
    } catch (KeeperException e) {
    e.printStackTrace();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    // 解锁方法
    public void zkUnlock() {
    try {
    zk.delete(this.currentNode, -1);
    } catch (InterruptedException | KeeperException e) {
    e.printStackTrace();
    }
    }

    }
  • 测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    public class DistributedLockTest {

    public static void main(String[] args) throws Exception {
    // 创建分布式锁 1
    final DistributedLock lock1 = new DistributedLock();
    // 创建分布式锁 2
    final DistributedLock lock2 = new DistributedLock();

    new Thread(new Runnable() {
    @Override
    public void run() {
    // 获取锁对象
    try {
    lock1.zkLock();
    System.out.println("线程 1 获取锁");
    Thread.sleep(5 * 1000);
    lock1.zkUnlock();
    System.out.println("线程 1 释放锁");
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }).start();

    new Thread(new Runnable() {
    @Override
    public void run() {
    // 获取锁对象
    try {
    lock2.zkLock();
    System.out.println("线程 2 获取锁");
    Thread.sleep(5 * 1000);
    lock2.zkUnlock();
    System.out.println("线程 2 释放锁");
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }).start();
    }

    }
  • 结果

    1
    2
    3
    4
    线程 1 获取锁
    线程 1 释放锁
    线程 2 获取锁
    线程 2 释放锁

Curator框架实现

原生的 Java API 开发存在的问题:

  1. 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
  2. Watch 需要重复注册,不然就不能生效
  3. 开发的复杂性还是比较高的
  4. 不支持多节点删除和创建。需要自己去递归

Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。

详情请查看官方文档

Curator 案例实操:

  1. 添加依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.3.0</version>
    </dependency>
  2. 代码实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    public class CuratorLockTest {

    private String rootNode = "/locks";
    private String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
    private int connectionTimeout = 2000;
    private int sessionTimeout = 2000;

    public static void main(String[] args) {
    new CuratorLockTest().test();
    }


    // 测试
    private void test(){
    // 创建分布式锁 1
    final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
    // 创建分布式锁 2
    final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);

    new Thread(new Runnable() {
    @Override
    public void run() {
    // 获取锁对象
    try {
    lock1.acquire();
    System.out.println("线程 1 获取锁");
    // 测试锁重入
    lock1.acquire();
    System.out.println("线程 1 再次获取锁");
    Thread.sleep(5 * 1000);
    lock1.release();
    System.out.println("线程 1 释放锁");
    lock1.release();
    System.out.println("线程 1 再次释放锁");
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }).start();

    new Thread(new Runnable() {
    @Override
    public void run() {
    // 获取锁对象
    try {
    lock2.acquire();
    System.out.println("线程 2 获取锁");
    // 测试锁重入
    lock2.acquire();
    System.out.println("线程 2 再次获取锁");
    Thread.sleep(5 * 1000);
    lock2.release();
    System.out.println("线程 2 释放锁");
    lock2.release();
    System.out.println("线程 2 再次释放锁");
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }).start();

    }

    // 分布式锁初始化
    public CuratorFramework getCuratorFramework () {
    //重试策略,初试时间 3 秒,重试 3 次
    RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
    //通过工厂创建 Curator
    CuratorFramework client =
    CuratorFrameworkFactory.builder()
    .connectString(connectString)
    .connectionTimeoutMs(connectionTimeout)
    .sessionTimeoutMs(sessionTimeout)
    .retryPolicy(policy).build();
    //开启连接
    client.start();
    System.out.println("zookeeper 初始化完成...");
    return client;
    }

    }
  3. 效果

    1
    2
    3
    4
    5
    6
    7
    8
    线程 1 获取锁
    线程 1 再次获取锁
    线程 1 释放锁
    线程 1 再次释放锁
    线程 2 获取锁
    线程 2 再次获取锁
    线程 2 释放锁
    线程 2 再次释放锁