curator的使用

  • Post author:
  • Post category:其他


先配置一下maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>curator</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
<!--        junit-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
<!--        curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

<!--        日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>

    </dependencies>

    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>nexus-aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>


</project>

写一个测试类,看看连接成功不

需要在其对应的虚拟机之上打开服务端


./zkServer.sh start

//        第一种方式
        /*
        String connectString  //连接字符串
        *
        int sessionTimeoutMs  //会话超时时间
        int connectionTimeoutMs  //连接超时时间
         RetryPolicy retryPolicy   //重试策略
        * */

        RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
/*
*   public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) {
        this(baseSleepTimeMs, maxRetries, 2147483647);
    }

* */
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.249.135:2181",
                60 * 1000, 15 * 1000, retryPolicy);
        client.start();
//        开启连接

运行之后发现可以连接成功

//第二种连接方式,链式编程
 CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.249.135:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).build();
        client1.start();

可以创建一个节点

/*
* 创建节点
* */
    @Test
    public void testCreate() throws Exception {
//        基本创建
        String s = client1.create().forPath("/zhangsan");
        // String s = client1.create().forPath("/lisi","hhh".getBytes());
        System.out.println(s);

    }
//在这个实例之中需要创建一个client1的全局变量
//在连接之前使用  @Before这个注解,让其测试之前进行连接
//在服务器的客户端查看
    @After
    public void close(){
        if(client1!=null){
            client1.close();
        }
    }
[zk: localhost:2181(CONNECTED) 7] get /wh/lisi
hhh
[zk: localhost:2181(CONNECTED) 8] get /wh/zhangsan
192.168.249.1

查询

/**
     * 查询数据 get
     * 查询节点 ls
     * 查询节点信息 ls-s
     */
    @Test
    public void testget() throws Exception {
        byte[] app1s = client1.getData().forPath("/lisi");
        System.out.println(new String(app1s));

    }
//客户端命令方式查看
[zk: localhost:2181(CONNECTED) 3] get /wh/lisi
hhh
运行结果
hhh

  @Test
    public void testget1()throws Exception {
        List<String> strings = client1.getChildren().forPath("/");
        System.out.println(strings);

    }
    [zk: localhost:2181(CONNECTED) 5] ls /wh
[app1, lisi, wanghhh, zfy, zhangsan]
运行结果
[zhangsan, lisi, zfy, app1, wanghhh]

 @Test
    public void testget2()throws Exception {
        Stat stat = new Stat();
        client1.getData().storingStatIn(stat).forPath("/");
        System.out.println(stat);


    }
    运行结果
    33,33,1653559522532,1653559522532,0,5,0,0,0,5,74

修改节点

   @Test
    public void testset()throws Exception {
        client1.setData().forPath("/app1","zhang".getBytes());
    }
    
    在客户端之中查看
    [zk: localhost:2181(CONNECTED) 5] get /wh/app1
	zhang

//根据版本修改
 @Test
    public void testset1()throws Exception {
        Stat stat = new Stat();
        client1.getData().storingStatIn(stat).forPath("/app1");
        int version = stat.getVersion();
        System.out.println(version);
        Stat stat1 = client1.setData().withVersion(version).forPath("/app1", "zhah".getBytes());
        
    }

监听事件

package com.wh;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import javax.xml.bind.Marshaller;

public class CurtorWatchTest {

    private CuratorFramework client1;
    //    @Before是执行
    @Before
    public void testCurator(){

//    第二种方式
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
        client1 = CuratorFrameworkFactory.builder().connectString("192.168.249.135:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).namespace("wh").build();
        client1.start();

    }

    @After
    public void close(){
        if(client1!=null){
            client1.close();
        }
    }

    /**
     * NodeCache:指定单一节点注册监听
     */
    @Test
    public void testNodeCache() throws Exception {
//        1.创建Nodecache对象
       final NodeCache nodeCache = new NodeCache(client1,"/app1");
//        2.注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点变化了");
//                获取修改节点的数据
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println(new String(data));

            }
        });
//        3.开启监听,如果设置为true 则开启监听
        nodeCache.start(true);
        while(true){

        }

    }

    /**
     * pathChildrenCache:监听某个节点的所有子节点
    * */
    @Test
    public void testNodeCache1() throws Exception {
//        1.创建监听对象
        PathChildrenCache pathChildrenCache =
                new PathChildrenCache(client1, "/app2", true);
//        2.绑定监听器
       pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {

           @Override
           public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
               System.out.println("子节点发生变化了");
               System.out.println(pathChildrenCacheEvent);

           }
       });
//        3.开启监听
        pathChildrenCache.start();

        while(true){

        }
    }
/**
进行运行,testNodeCache1
*/
[zk: localhost:2181(CONNECTED) 9] create /wh/app2/p4
Created /wh/app2/p4
[zk: localhost:2181(CONNECTED) 10] set /wh/app2/p4 waaa
[zk: localhost:2181(CONNECTED) 11] delete /wh/app2/p4

//可以看见打印信息
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/app2/p4', stat=142,142,1654074475099,1654074475099,0,0,0,0,0,0,142
, data=null}}
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p4', stat=142,143,1654074475099,1654074518507,1,0,0,0,4,0,142
, data=[119, 97, 97, 97]}}
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_REMOVED, data=ChildData{path='/app2/p4', stat=142,143,1654074475099,1654074518507,1,0,0,0,4,0,142
, data=[119, 97, 97, 97]}}


}

在以上的代码之中加入,可以在pathChildrenCache这个监听这种对变跟的类型进行判断

    PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//               判断类型,是否是更新数据类型
               if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                   byte[] data = pathChildrenCacheEvent.getData().getData();
                   System.out.println(new String(data));

               }
[zk: localhost:2181(CONNECTED) 0] create /wh/app2/p5
Created /wh/app2/p5
[zk: localhost:2181(CONNECTED) 1] set /wh/app2/p5 aaaaaaaaaaaaaaaaaa
[zk: localhost:2181(CONNECTED) 2] set /wh/app2/p5 AAAAAAAAAA

控制台输出
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/app2/p5', stat=149,149,1654075591897,1654075591897,0,0,0,0,0,0,149
, data=null}}
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p5', stat=149,150,1654075591897,1654075612940,1,0,0,0,18,0,149
, data=[97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97]}}
aaaaaaaaaaaaaaaaaa
子节点发生变化了
PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p5', stat=149,151,1654075591897,1654075632156,2,0,0,0,10,0,149
, data=[65, 65, 65, 65, 65, 65, 65, 65, 65, 65]}}
AAAAAAAAAA

TreeCache:监听所有的节点

    @Test
    public void testTreeCache() throws Exception {
//        1.创建监听器
        TreeCache treeCache = new TreeCache(client1, "/");
//        2.注册监听
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("节点变化了");
                System.out.println(treeCacheEvent);
            }
        });
        treeCache.start();
        while(true){}
    }

分布式锁

zookeeper分布式锁的核心思想:当客户端要获取锁,则创建节点,使用完锁,删除节点。

1.客户端获取锁的时候,在根节点下lock(此节点可以任免)创建一个临时顺序节点

2.然后获取lock下面所有的子节点,客户端获取所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用玩锁之后,将该节点进行删除。

3.如果发现自己创建的节点并非lock所有的节点最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听,监听删除事件

4.如果发现比自己小的节点被删除了,则客户端的Watcher会收到相应的通知,此时在判断是否是lock子节点序号最小的,如果是则获取到了锁,如果不是则重复以上步骤获取到比自己小的一个节点。并注册监听。

简单的模拟

package com.wh;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class Ticket implements Runnable {
    private int tickets=10; //模拟十张票
    private InterProcessMutex lock;
    public Ticket(){
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
       CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.249.135:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).build();
        client1.start();
        lock=new InterProcessMutex(client1,"/lock");
    }

    @Override
    public void run() {
        while(true){
//            加锁
            try {
                lock.acquire(3, TimeUnit.SECONDS);
                //            获得许可
                if(tickets>0){
                    System.out.println(Thread.currentThread()+":"+tickets);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //            释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }


        }
    }
}

package com.wh;

public class LockTest {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
//        1.创建客户端
        Thread thread = new Thread(ticket, "携程");
        Thread thread1 = new Thread(ticket, "飞猪");
        thread.start();
        thread1.start();

    }
}



版权声明:本文为qq_44017091原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。