先配置一下maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>curator</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<!-- 日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
写一个测试类,看看连接成功不
需要在其对应的虚拟机之上打开服务端
./zkServer.sh start
// 第一种方式
/*
String connectString //连接字符串
*
int sessionTimeoutMs //会话超时时间
int connectionTimeoutMs //连接超时时间
RetryPolicy retryPolicy //重试策略
* */
RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
/*
* public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) {
this(baseSleepTimeMs, maxRetries, 2147483647);
}
* */
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.249.135:2181",
60 * 1000, 15 * 1000, retryPolicy);
client.start();
// 开启连接
运行之后发现可以连接成功
//第二种连接方式,链式编程
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.249.135:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).build();
client1.start();
可以创建一个节点
/*
* 创建节点
* */
@Test
public void testCreate() throws Exception {
// 基本创建
String s = client1.create().forPath("/zhangsan");
// String s = client1.create().forPath("/lisi","hhh".getBytes());
System.out.println(s);
}
//在这个实例之中需要创建一个client1的全局变量
//在连接之前使用 @Before这个注解,让其测试之前进行连接
//在服务器的客户端查看
@After
public void close(){
if(client1!=null){
client1.close();
}
}
[zk: localhost:2181(CONNECTED) 7] get /wh/lisi
hhh
[zk: localhost:2181(CONNECTED) 8] get /wh/zhangsan
192.168.249.1
查询
/**
* 查询数据 get
* 查询节点 ls
* 查询节点信息 ls-s
*/
@Test
public void testget() throws Exception {
byte[] app1s = client1.getData().forPath("/lisi");
System.out.println(new String(app1s));
}
//客户端命令方式查看
[zk: localhost:2181(CONNECTED) 3] get /wh/lisi
hhh
运行结果
hhh
@Test
public void testget1()throws Exception {
List<String> strings = client1.getChildren().forPath("/");
System.out.println(strings);
}
[zk: localhost:2181(CONNECTED) 5] ls /wh
[app1, lisi, wanghhh, zfy, zhangsan]
运行结果
[zhangsan, lisi, zfy, app1, wanghhh]
@Test
public void testget2()throws Exception {
Stat stat = new Stat();
client1.getData().storingStatIn(stat).forPath("/");
System.out.println(stat);
}
运行结果
33,33,1653559522532,1653559522532,0,5,0,0,0,5,74
修改节点
@Test
public void testset()throws Exception {
client1.setData().forPath("/app1","zhang".getBytes());
}
在客户端之中查看
[zk: localhost:2181(CONNECTED) 5] get /wh/app1
zhang
//根据版本修改
@Test
public void testset1()throws Exception {
Stat stat = new Stat();
client1.getData().storingStatIn(stat).forPath("/app1");
int version = stat.getVersion();
System.out.println(version);
Stat stat1 = client1.setData().withVersion(version).forPath("/app1", "zhah".getBytes());
}
监听事件
package com.wh;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.xml.bind.Marshaller;
public class CurtorWatchTest {
private CuratorFramework client1;
// @Before是执行
@Before
public void testCurator(){
// 第二种方式
RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
client1 = CuratorFrameworkFactory.builder().connectString("192.168.249.135:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("wh").build();
client1.start();
}
@After
public void close(){
if(client1!=null){
client1.close();
}
}
/**
* NodeCache:指定单一节点注册监听
*/
@Test
public void testNodeCache() throws Exception {
// 1.创建Nodecache对象
final NodeCache nodeCache = new NodeCache(client1,"/app1");
// 2.注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了");
// 获取修改节点的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
// 3.开启监听,如果设置为true 则开启监听
nodeCache.start(true);
while(true){
}
}
/**
* pathChildrenCache:监听某个节点的所有子节点
* */
@Test
public void testNodeCache1() throws Exception {
// 1.创建监听对象
PathChildrenCache pathChildrenCache =
new PathChildrenCache(client1, "/app2", true);
// 2.绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点发生变化了");
System.out.println(pathChildrenCacheEvent);
}
});
// 3.开启监听
pathChildrenCache.start();
while(true){
}
}
/**
进行运行,testNodeCache1
*/
[zk: localhost:2181(CONNECTED) 9] create /wh/app2/p4
Created /wh/app2/p4
[zk: localhost:2181(CONNECTED) 10] set /wh/app2/p4 waaa
[zk: localhost:2181(CONNECTED) 11] delete /wh/app2/p4
//可以看见打印信息
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/app2/p4', stat=142,142,1654074475099,1654074475099,0,0,0,0,0,0,142
, data=null}}
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p4', stat=142,143,1654074475099,1654074518507,1,0,0,0,4,0,142
, data=[119, 97, 97, 97]}}
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_REMOVED, data=ChildData{path='/app2/p4', stat=142,143,1654074475099,1654074518507,1,0,0,0,4,0,142
, data=[119, 97, 97, 97]}}
}
在以上的代码之中加入,可以在pathChildrenCache这个监听这种对变跟的类型进行判断
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
// 判断类型,是否是更新数据类型
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
[zk: localhost:2181(CONNECTED) 0] create /wh/app2/p5
Created /wh/app2/p5
[zk: localhost:2181(CONNECTED) 1] set /wh/app2/p5 aaaaaaaaaaaaaaaaaa
[zk: localhost:2181(CONNECTED) 2] set /wh/app2/p5 AAAAAAAAAA
控制台输出
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/app2/p5', stat=149,149,1654075591897,1654075591897,0,0,0,0,0,0,149
, data=null}}
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p5', stat=149,150,1654075591897,1654075612940,1,0,0,0,18,0,149
, data=[97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97]}}
aaaaaaaaaaaaaaaaaa
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p5', stat=149,151,1654075591897,1654075632156,2,0,0,0,10,0,149
, data=[65, 65, 65, 65, 65, 65, 65, 65, 65, 65]}}
AAAAAAAAAA
TreeCache:监听所有的节点
@Test
public void testTreeCache() throws Exception {
// 1.创建监听器
TreeCache treeCache = new TreeCache(client1, "/");
// 2.注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("节点变化了");
System.out.println(treeCacheEvent);
}
});
treeCache.start();
while(true){}
}
分布式锁
zookeeper分布式锁的核心思想:当客户端要获取锁,则创建节点,使用完锁,删除节点。
1.客户端获取锁的时候,在根节点下lock(此节点可以任免)创建一个临时顺序节点
2.然后获取lock下面所有的子节点,客户端获取所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用玩锁之后,将该节点进行删除。
3.如果发现自己创建的节点并非lock所有的节点最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听,监听删除事件
4.如果发现比自己小的节点被删除了,则客户端的Watcher会收到相应的通知,此时在判断是否是lock子节点序号最小的,如果是则获取到了锁,如果不是则重复以上步骤获取到比自己小的一个节点。并注册监听。
简单的模拟
package com.wh;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class Ticket implements Runnable {
private int tickets=10; //模拟十张票
private InterProcessMutex lock;
public Ticket(){
RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.249.135:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).build();
client1.start();
lock=new InterProcessMutex(client1,"/lock");
}
@Override
public void run() {
while(true){
// 加锁
try {
lock.acquire(3, TimeUnit.SECONDS);
// 获得许可
if(tickets>0){
System.out.println(Thread.currentThread()+":"+tickets);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
// 释放锁
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
package com.wh;
public class LockTest {
public static void main(String[] args) {
Ticket ticket = new Ticket();
// 1.创建客户端
Thread thread = new Thread(ticket, "携程");
Thread thread1 = new Thread(ticket, "飞猪");
thread.start();
thread1.start();
}
}
版权声明:本文为qq_44017091原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。