Zookeeper在Linux的集群安装
-
下载并解压 tar xf apache-zookeeper-3.8.0.tar.gz
-
进入配置文件目录 cd apache-zookeeper-3.8.0/conf
-
将文件夹下的zoo_sample.cfg文件拷贝一份命名为zoo.cfg(必须)
cp zoo_sample.cfg zoo.cfg -
在配置文件zoo.cfg尾部加入集群节点配置
server.1=192.168.239.128:2888:3888 server.2=192.168.239.129:2888:3888 server.3=192.168.239.130:2888:3888 server.4=192.168.239.131:2888:3888
-
去到配置文件配置的DataDir目录创建myid文件
mkdir -p /tmp/zookeeper echo 1 > /tmp/zookeeper/myid #重定向值1到myid文件为该节点选主ID
-
添加环境变量
vim /etc/profile# 配置Zookeeper环境变量 export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.8.0 export PATH=$PATH:$ZOOKEEPER_HOME/bin
刷新配置文件的命令: source /etc/profile
CentOS7:
– 关闭防火墙命令:systemctl stop firewalld.service
– 设置永久关闭:systemctl disable firewalld.service
Zookeeper的数据结构
Zookeeper 提供了一个类似于 Linux 文件系统的树形结构,可认为是轻量级的内存文件系统。但每个节点最多只能存1M数据,所以一定不能把Zookeeper当数据库用。
基于它的数据结构,很多人用它来实现:服务发现,分布式锁,分布式领导选举,分布式配置管理等功能。
高并发高可用集群原理
zookeeper使用主从复制集群方式,主节点可进行读和写操作,从节只进行读操作。
zookeeper有自我修复功能,当领导节点(主节点)挂了,zookeeper其余节点能在几百毫秒中快速选出新的领导节点,本身自带高可用。
一个客户端连接会于zookeeper建立一个session。
临时节点
是基于session走 session销毁 节点数据就销毁。
- Zookeeper是集群部署,只要集群中超过半数节点存活(否则选不出新主了),即可提供服务
- 每次数据更新操作要么成功(半数以上节点成功),要么失败,不存在中间状态。
- 选举时只要半数通过就成功。
- 主节点如果挂了为了保证最终一致性,选举过程中zookeeper所有节点不对外提供服务
- 由于从节点保证的是最终一致性,在这个一致性间隙中客户端是有可能出现读到旧数据的情况
Zookeeper的选举机制
-
对比各个节点的事务ID与节点ID大小进行选举,一般会选举当前已启动节点中最大节点ID的节点作为Leader
-
无论哪个节点先发现Leader主节点挂了,都会触发其余所有节点的选举机制(能够快速选举的原因)
整合SpringBoot开始&API
-
引入maven依赖(依赖版本必须和zookeeper安装的版本一致)
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.8.0</version> </dependency>
-
springboot配置文件中进行配置
#自定义zookeeper配置 #配置集群节点 zookeeper.address= 192.168.239.128:2181,192.168.239.129:2181,192.168.239.130:2181,192.168.239.131:2181 #seesion超时时间 zookeeper.timeout= 3000
-
编写zookeeper配置类
@Configuration public class ZookeeperConfig { private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfig.class); @Value("${zookeeper.address}") private String connectString; @Value("${zookeeper.timeout}") private int timeout; @Bean(name = "zkClient") public ZooKeeper zkClient(){ ZooKeeper zooKeeper=null; try { final CountDownLatch countDownLatch = new CountDownLatch(1); //发生各类事件时,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码 zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() { @Override public void process(WatchedEvent event) { //event状态为连接成功 if(Event.KeeperState.SyncConnected==event.getState()){ //如果收到了服务端的响应事件,连接成功。门栓锁减1 countDownLatch.countDown(); } } }); countDownLatch.await(); logger.info("【初始化ZooKeeper连接状态....】={}",zooKeeper.getState()); }catch (Exception e){ logger.error("初始化ZooKeeper连接异常....】={}",e); } return zooKeeper; } }
-
编写对zookeeper节点的增删改查工具类
@Component public class ZkApi { private static final Logger logger = LoggerFactory.getLogger(ZkApi.class); @Autowired private ZooKeeper zkClient; /** * 判断指定节点是否存在 * @param path * @param needWatch 指定是否复用zookeeper中默认的Watcher * @return */ public Stat exists(String path, boolean needWatch){ try { return zkClient.exists(path,needWatch); } catch (Exception e) { logger.error("【断指定节点是否存在异常】{},{}",path,e); return null; } } /** * 检测结点是否存在 并设置监听事件 * 三种监听类型: 创建,删除,更新 * * @param path * @param watcher 传入指定的监听类 * @return */ public Stat exists(String path, Watcher watcher ){ try { return zkClient.exists(path,watcher); } catch (Exception e) { logger.error("【断指定节点是否存在异常】{},{}",path,e); return null; } } /** * 创建持久化节点 * @param path * @param data */ public boolean createNode(String path, String data){ try { zkClient.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); return true; } catch (Exception e) { logger.error("【创建持久化节点异常】{},{},{}",path,data,e); return false; } } /** * 修改持久化节点 * @param path * @param data */ public boolean updateNode(String path, String data){ try { //zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1. //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查 zkClient.setData(path,data.getBytes(),-1); return true; } catch (Exception e) { logger.error("【修改持久化节点异常】{},{},{}",path,data,e); return false; } } /** * 删除持久化节点 * @param path */ public boolean deleteNode(String path){ try { //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查 zkClient.delete(path,-1); return true; } catch (Exception e) { logger.error("【删除持久化节点异常】{},{}",path,e); return false; } } /** * 获取当前节点的子节点(不包含孙子节点) * @param path 父节点path */ public List<String> getChildren(String path) throws KeeperException, InterruptedException{ List<String> list = zkClient.getChildren(path, false); return list; } /** * 获取指定节点的值 * @param path * @return */ public String getData(String path,Watcher watcher){ try { Stat stat=new Stat(); byte[] bytes=zkClient.getData(path,watcher,stat); return new String(bytes); }catch (Exception e){ e.printStackTrace(); return null; } } /** * 测试方法 初始化 */ @PostConstruct public void init(){ String path="/zk-node-2"; logger.info("【执行初始化测试方法。。。。。。。。。。。。】"); createNode(path,"测试"); String value=getData(path,new WatcherApi()); logger.info("【执行初始化测试方法getData返回值。。。。。。。。。。。。】={}",value); // 删除节点出发 监听事件 deleteNode(path); } }
-
编写zookeeper事件监听回调方法
public class WatcherApi implements Watcher { private static final Logger logger = LoggerFactory.getLogger(WatcherApi.class); @Override public void process(WatchedEvent event) { logger.info("【Watcher监听事件】={}",event.getState()); logger.info("【监听路径为】={}",event.getPath()); logger.info("【监听的类型为】={}",event.getType()); // 三种监听类型: 创建,删除,更新 } }
分布式配置、注册发现、分布式锁
- 主要都是依靠zookeeper的Watcher监听节点的事件来实现。
- 可重入分布式锁的编写像是一种加入zookeeper的ReentrantLock(工作中一般都会封装好,直接使用即可)