高并发高可用之Zookeeper

  • Post author:
  • Post category:其他




Zookeeper在Linux的集群安装

  1. 下载并解压 tar xf apache-zookeeper-3.8.0.tar.gz

  2. 进入配置文件目录 cd apache-zookeeper-3.8.0/conf

  3. 将文件夹下的zoo_sample.cfg文件拷贝一份命名为zoo.cfg(必须)

    cp zoo_sample.cfg zoo.cfg

  4. 在配置文件zoo.cfg尾部加入集群节点配置

    server.1=192.168.239.128:2888:3888
    server.2=192.168.239.129:2888:3888
    server.3=192.168.239.130:2888:3888
    server.4=192.168.239.131:2888:3888
    
  5. 去到配置文件配置的DataDir目录创建myid文件

    	mkdir -p /tmp/zookeeper
    	echo 1 > /tmp/zookeeper/myid     #重定向值1到myid文件为该节点选主ID
    
  6. 添加环境变量

    vim /etc/profile

    	# 配置Zookeeper环境变量
    	export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.8.0
    	export PATH=$PATH:$ZOOKEEPER_HOME/bin
    

    刷新配置文件的命令: source /etc/profile


CentOS7:


– 关闭防火墙命令:systemctl stop firewalld.service

– 设置永久关闭:systemctl disable firewalld.service



Zookeeper的数据结构


Zookeeper 提供了一个类似于 Linux 文件系统的树形结构,可认为是轻量级的内存文件系统。但每个节点最多只能存1M数据,所以一定不能把Zookeeper当数据库用。

基于它的数据结构,很多人用它来实现:服务发现,分布式锁,分布式领导选举,分布式配置管理等功能。



高并发高可用集群原理

zookeeper使用主从复制集群方式,主节点可进行读和写操作,从节只进行读操作。

zookeeper有自我修复功能,当领导节点(主节点)挂了,zookeeper其余节点能在几百毫秒中快速选出新的领导节点,本身自带高可用。

在这里插入图片描述

一个客户端连接会于zookeeper建立一个session。


临时节点

是基于session走 session销毁 节点数据就销毁。

  • Zookeeper是集群部署,只要集群中超过半数节点存活(否则选不出新主了),即可提供服务
  • 每次数据更新操作要么成功(半数以上节点成功),要么失败,不存在中间状态。
  • 选举时只要半数通过就成功。
  • 主节点如果挂了为了保证最终一致性,选举过程中zookeeper所有节点不对外提供服务
  • 由于从节点保证的是最终一致性,在这个一致性间隙中客户端是有可能出现读到旧数据的情况



Zookeeper的选举机制

  • 对比各个节点的事务ID与节点ID大小进行选举,一般会选举当前已启动节点中最大节点ID的节点作为Leader

  • 无论哪个节点先发现Leader主节点挂了,都会触发其余所有节点的选举机制(能够快速选举的原因)



整合SpringBoot开始&API

  1. 引入maven依赖(依赖版本必须和zookeeper安装的版本一致)

    	<dependency>
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
           <version>3.8.0</version>
    	</dependency>
    
  2. springboot配置文件中进行配置

    #自定义zookeeper配置
    #配置集群节点
    zookeeper.address= 192.168.239.128:2181,192.168.239.129:2181,192.168.239.130:2181,192.168.239.131:2181
    #seesion超时时间
    zookeeper.timeout= 3000
    
    
  3. 编写zookeeper配置类

    @Configuration
    public class ZookeeperConfig {
        private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfig.class);
    
        @Value("${zookeeper.address}")
        private  String connectString;
        @Value("${zookeeper.timeout}")
        private  int timeout;
    
        @Bean(name = "zkClient")
        public ZooKeeper zkClient(){
            ZooKeeper zooKeeper=null;
            try {
                final CountDownLatch countDownLatch = new CountDownLatch(1);
                //发生各类事件时,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码
                 zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        //event状态为连接成功
                        if(Event.KeeperState.SyncConnected==event.getState()){
                            //如果收到了服务端的响应事件,连接成功。门栓锁减1
                            countDownLatch.countDown();
                        }
                    }
                });
                countDownLatch.await();
                logger.info("【初始化ZooKeeper连接状态....】={}",zooKeeper.getState());
    
            }catch (Exception e){
                logger.error("初始化ZooKeeper连接异常....】={}",e);
            }
             return  zooKeeper;
        }
    
    
    }
    
  4. 编写对zookeeper节点的增删改查工具类

    @Component
    public class ZkApi {
    
        private static final Logger logger = LoggerFactory.getLogger(ZkApi.class);
    
        @Autowired
        private ZooKeeper zkClient;
    
    
        /**
         * 判断指定节点是否存在
         * @param path
         * @param needWatch  指定是否复用zookeeper中默认的Watcher
         * @return
         */
        public Stat exists(String path, boolean needWatch){
            try {
                return zkClient.exists(path,needWatch);
            } catch (Exception e) {
                logger.error("【断指定节点是否存在异常】{},{}",path,e);
                return null;
            }
        }
    
        /**
         *  检测结点是否存在 并设置监听事件
         *      三种监听类型: 创建,删除,更新
         *
         * @param path
         * @param watcher  传入指定的监听类
         * @return
         */
        public Stat exists(String path, Watcher watcher ){
            try {
                return zkClient.exists(path,watcher);
            } catch (Exception e) {
                logger.error("【断指定节点是否存在异常】{},{}",path,e);
                return null;
            }
        }
    
        /**
         * 创建持久化节点
         * @param path
         * @param data
         */
        public boolean createNode(String path, String data){
            try {
                zkClient.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                return true;
            } catch (Exception e) {
                logger.error("【创建持久化节点异常】{},{},{}",path,data,e);
                return false;
            }
        }
    
    
        /**
         * 修改持久化节点
         * @param path
         * @param data
         */
        public boolean updateNode(String path, String data){
            try {
                //zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1.
                //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
                zkClient.setData(path,data.getBytes(),-1);
                return true;
            } catch (Exception e) {
                logger.error("【修改持久化节点异常】{},{},{}",path,data,e);
                return false;
            }
        }
    
        /**
         * 删除持久化节点
         * @param path
         */
        public boolean deleteNode(String path){
            try {
                //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
                zkClient.delete(path,-1);
                return true;
            } catch (Exception e) {
                logger.error("【删除持久化节点异常】{},{}",path,e);
                return false;
            }
        }
    
        /**
          * 获取当前节点的子节点(不包含孙子节点)
          * @param path 父节点path
          */
        public List<String> getChildren(String path) throws KeeperException, InterruptedException{
            List<String> list = zkClient.getChildren(path, false);
            return list;
        }
    
        /**
         * 获取指定节点的值
         * @param path
         * @return
         */
        public  String getData(String path,Watcher watcher){
            try {
                Stat stat=new Stat();
                byte[] bytes=zkClient.getData(path,watcher,stat);
                return  new String(bytes);
            }catch (Exception e){
                e.printStackTrace();
                return  null;
            }
        }
    
    
        /**
         * 测试方法  初始化
         */
        @PostConstruct
        public  void init(){
            String path="/zk-node-2";
            logger.info("【执行初始化测试方法。。。。。。。。。。。。】");
            createNode(path,"测试");
            String value=getData(path,new WatcherApi());
            logger.info("【执行初始化测试方法getData返回值。。。。。。。。。。。。】={}",value);
    
            // 删除节点出发 监听事件
            deleteNode(path);
    
        }
    
    }
    
  5. 编写zookeeper事件监听回调方法

    public class WatcherApi implements Watcher {
    
        private static final Logger logger = LoggerFactory.getLogger(WatcherApi.class);
        @Override
        public void process(WatchedEvent event) {
            logger.info("【Watcher监听事件】={}",event.getState());
            logger.info("【监听路径为】={}",event.getPath());
            logger.info("【监听的类型为】={}",event.getType()); //  三种监听类型: 创建,删除,更新
        }
    }
    



分布式配置、注册发现、分布式锁

  • 主要都是依靠zookeeper的Watcher监听节点的事件来实现。
  • 可重入分布式锁的编写像是一种加入zookeeper的ReentrantLock(工作中一般都会封装好,直接使用即可)



版权声明:本文为m0_48268301原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。