基于Zookeeper使用curator实现分布式锁和节点数据监听

  • Post author:
  • Post category:其他


1、添加相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.cxb</groupId>
    <artifactId>springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-hello</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <slf4j.version>1.6.6</slf4j.version>
        <log4j.version>1.2.12</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- mysql驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.23</version>
        </dependency>
        <!-- mybatis -->
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.4.5</version>
        </dependency>
        <!-- 链接池 -->
        <dependency>
            <groupId>commons-dbcp</groupId>
            <artifactId>commons-dbcp</artifactId>
            <version>1.4</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <!-- log start -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.9</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
# zk配置
curator:
  retryCount: 5      #重试次数
  baseSleepTimeMs: 1000   #重试间隔时间
  connectString: 127.0.0.1:2181    # zk地址
  sessionTimeoutMs: 60000          #session超时时间
  connectionTimeoutMs: 5000         #连接超时时间

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    data-username: root
    data-password: 123456
    url: jdbc:mysql:///activiti?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT

server:
  port: 8081

zookeeper:
  server: 127.0.0.1:2181
  sleep-time: 1000
  max-retries: 3
  session-timeout: 1500
  connection-timerout: 5000

2、编码实现

package com.cxb.springboot.common;

import java.util.concurrent.TimeUnit;

/**
 * @Classname AbstractLock
 * @Description TODO
 * @Date 2023/3/28 22:41
 * @Created by Administrator
 */
public abstract class AbstractLock<T> {

    /**
     * 锁路径
     */
    protected String lockPath;

    /**
     * 超时时间
     */
    protected long time;

    protected TimeUnit timeUnit;

    public AbstractLock(String lockPath, long time, TimeUnit timeUnit) {
        this.lockPath = lockPath;
        this.time = time;
        this.timeUnit = timeUnit;
    }

    public void setLockPath(String lockPath) {
        this.lockPath = lockPath;
    }

    public String getLockPath() {
        return lockPath;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    public void setTimeUnit(TimeUnit timeUnit) {
        this.timeUnit = timeUnit;
    }

    public TimeUnit getTimeUnit() {
        return timeUnit;
    }

    /**
     * 执行业务的方法
     *
     * @return
     */
    public abstract T execute();
}
package com.cxb.springboot.common;

/**
 * @Classname ZookeeperClient
 * @Description TODO
 * @Date 2023/3/28 22:38
 * @Created by Administrator
 */
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

@Slf4j
public class ZookeeperClient {

    private CuratorFramework curatorFramework;

    public ZookeeperClient(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }

    public <T> T lock(AbstractLock<T> abstractLock) {
        //获取锁路径
        String lockPath = abstractLock.getLockPath();
        //创建InterProcessMutex实例
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); //创建锁对象
        boolean success = false;
        try {
            try {
                //加锁
                success = lock.acquire(abstractLock.getTime(), abstractLock.getTimeUnit()); //获取锁
            } catch (Exception e) {
                throw new RuntimeException("尝试获取锁异常:" + e.getMessage() + ", lockPath " + lockPath);
            }
            //判断是否加锁成功
            if (success) {
                return abstractLock.execute();
            } else {
                log.info("获取锁失败,返回null");
                return null;
            }
        } finally {
            try {
                if (success) {
                    //释放锁
                    lock.release();
                }
            } catch (Exception e) {
                log.error("释放锁异常: {}, lockPath {}", e.getMessage(), lockPath);
            }
        }
    }

    //bean的销毁方法
    public void destroy() {
        try {
            log.info("ZookeeperClient销毁方法,如果zookeeper连接不为空,则关闭连接");
            if (getCuratorFramework() != null) {
                //这种方式比较优雅的关闭连接
                getCuratorFramework().close();
            }
        } catch (Exception e) {
            log.error("stop zookeeper client error {}", e.getMessage());
        }
    }

    public CuratorFramework getCuratorFramework() {
        return curatorFramework;
    }
}
package com.cxb.springboot.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @Classname WrapperZk
 * @Description TODO
 * @Date 2023/3/28 21:57
 * @Created by Administrator
 */
@Data
@Component
@ConfigurationProperties(prefix = "curator")
public class WrapperZk {

    private int retryCount;

    private int elapsedTimeMs;

    private String connectString;

    private int sessionTimeoutMs;

    private int connectionTimeoutMs;

    private int baseSleepTimeMs;
}
package com.cxb.springboot.config;

import com.cxb.springboot.common.ZookeeperClient;
import com.cxb.springboot.listener.ZookeeperWatcherListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

import static com.cxb.springboot.service.CuratorLockService.dataPath;

/**
 * @Classname ZkConfiguration
 * @Description TODO
 * @Date 2023/3/28 21:57
 * @Created by Administrator
 */
@Configuration
@Slf4j
public class ZkConfiguration {

    @Resource
    private WrapperZk wrapperZk;

    /**
     * 其中RetryPolicy为重试策略,第一个参数为baseSleepTimeMs初始的sleep时间,
     * 用于计算之后的每次重试的sleep时间。第二个参数为maxRetries,最大重试次数。
     * @return
     */
    /*@Bean(initMethod = "start")
    public CuratorFramework curatorFramework() {
        RetryPolicy retrYPolicy = new ExponentialBackoffRetry(wrapperZk.getBaseSleepTimeMs(),
                wrapperZk.getRetryCount());
        CuratorFramework client = CuratorFrameworkFactory
                .newClient(wrapperZk.getConnectString(), retrYPolicy);
        log.info("zk curator初始化完成...");
        return client;
    }*/

    @Resource
    private ZookeeperProperties zookeeperProperties;

    @Bean
    public CuratorFramework curatorFrameworkClient() {
        //重试策略,ExponentialBackoffRetry(1000,3)这里表示等待1s重试,最大重试次数为3次
        RetryPolicy policy = new ExponentialBackoffRetry(zookeeperProperties.getSleepTime(),
                zookeeperProperties.getMaxRetries());
        //构建CuratorFramework实例
        CuratorFramework curatorFrameworkClient = CuratorFrameworkFactory
                .builder()
                .connectString(zookeeperProperties.getServer())
                .sessionTimeoutMs(zookeeperProperties.getSessionTimeout())
                .connectionTimeoutMs(zookeeperProperties.getConnectionTimeout())
                .retryPolicy(policy)
                .build();
        // 启动实例
        curatorFrameworkClient.start();
        CuratorCache curatorCache = CuratorCache.
                build(curatorFrameworkClient, dataPath, CuratorCache.Options.SINGLE_NODE_CACHE);
        CuratorCacheListener listener = CuratorCacheListener
                .builder()
                .forAll(new ZookeeperWatcherListener())
                .build();
        curatorCache.listenable().addListener(listener);
        curatorCache.start();

        return curatorFrameworkClient;
    }

    /**
     * 采用这种方式注册bean可以比较优雅的关闭连接
     */
    @Bean(destroyMethod = "destroy")
    public ZookeeperClient zookeeperClient(CuratorFramework curatorFrameworkClient) {
        return new ZookeeperClient(curatorFrameworkClient);
    }

}

package com.cxb.springboot.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @Classname ZookeeperProperties
 * @Description TODO
 * @Date 2023/3/28 22:34
 * @Created by Administrator
 */
@Data
@Component
@ConfigurationProperties(prefix = "zookeeper")
public class ZookeeperProperties {

    /**
     * zookeeper服务地址
     */
    private String server;

    /**
     * 重试等待时间
     */
    private int sleepTime;

    /**
     * 最大重试次数
     */
    private int maxRetries;

    /**
     * session超时时间
     */
    private int sessionTimeout;

    /**
     * 连接超时时间
     */
    private int connectionTimeout;

}
package com.cxb.springboot.controller;

import com.cxb.springboot.service.CuratorLockService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Classname CuratorController
 * @Description TODO
 * @Date 2023/3/28 22:43
 * @Created by Administrator
 */
@RequestMapping("/curator")
@RestController
public class CuratorController {

    @Autowired
    private CuratorLockService curatorLockService;

    /**
     * localhost:8081/curator/deduct?lockId=1001
     * @param lockId
     * @return
     */
    @RequestMapping("/deduct")
    public String deduct(String lockId) {
        return curatorLockService.inventoryDeduct(lockId);
    }

}

package com.cxb.springboot.controller;

import com.cxb.springboot.config.ZkConfiguration;
import com.cxb.springboot.service.CuratorLockService;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * @Classname MicroController
 * @Description TODO
 * @Date 2023/3/28 22:03
 * @Created by Administrator
 */
@Controller
@RequestMapping("/zk")
@Slf4j
public class MicroController {

    private final static String PATH = "/rootLock";

    private final static String ID_PATH = "/root/rootId";

    @Autowired
    ZkConfiguration zkConfiguration;

    @Autowired
    private CuratorLockService curatorLockService;

    /**
     * 这里我用的是最常用的可重入排他锁,也是公平锁(InterProcessMutex)
     * InterProcessMutex:分布式可重入排它锁
     * InterProcessSemaphoreMutex:分布式排它锁
     * InterProcessReadWriteLock:分布式读写锁
     * InterProcessMultiLock:将多个锁作为单个实体管理的容器
     * @return
     * @throws Exception
     */
    @GetMapping("/lock")
    @ResponseBody
    public String getLock1() throws Exception {
        InterProcessMutex lock = new InterProcessMutex(zkConfiguration.curatorFrameworkClient(), PATH);
        for (int i = 0; i < 30; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        log.info(Thread.currentThread().getName() + "尝试获取锁....");
                        lock.acquire();
                        log.info(Thread.currentThread().getName() + "获取锁成功....");
                        log.info(Thread.currentThread().getName() + "开始执行业务逻辑....");
                        Thread.sleep(10000);
                        lock.release();
                        log.info(Thread.currentThread().getName() + "释放锁成功....");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            thread.start();
        }
        return "execute success";
    }

    @GetMapping("/id")
    @ResponseBody
    public String createId(String data) throws Exception {
        return curatorLockService.getDistributedId(ID_PATH, data);
    }

}

节点数据变化监听器

package com.cxb.springboot.listener;

import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;

/**
 * @Classname ZookeeperWatcherListener
 * @Description TODO
 * @Date 2023/3/28 23:11
 * @Created by Administrator
 */
public class ZookeeperWatcherListener implements CuratorCacheListener {

    @Override
    public void event(Type type, ChildData oldData, ChildData data) {
        System.out.println("事件类型: " + type + " :oldData: " + new String(oldData.getData()) + " :data: " + new String(data.getData()));
    }
}
package com.cxb.springboot.service;

/**
 * @Classname CuratorLockService
 * @Description TODO
 * @Date 2023/3/28 22:41
 * @Created by Administrator
 */

import com.cxb.springboot.common.AbstractLock;
import com.cxb.springboot.common.ZookeeperClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class CuratorLockService {

    @Autowired
    private ZookeeperClient zookeeperClient;

    @Autowired
    private CuratorFramework curatorFramework;

    //库存存取的路径
    public static final String dataPath = "/root/data/stock";

    //初始化库存的路径
    public static final String initPath = "/root/init/stock";

    /**
     * 此方法系统启动执行,使用zookeeper存一个库存用于测试,这里也使用了锁。(只是一个模拟初始化库存的方法)
     */
    @PostConstruct
    public void init() {
        zookeeperClient.lock(new AbstractLock<Boolean>(initPath, 20, TimeUnit.SECONDS) {
            @Override
            public Boolean execute() {
                try {
                    //判断是否存在路径
                    Stat stat = curatorFramework.checkExists().forPath(dataPath);
                    if (stat == null) {
                        //为空则不存在,则创建并设置库存值
                        curatorFramework.create().creatingParentContainersIfNeeded().forPath(dataPath, "1000".getBytes());
                        log.info("初始化数据完成");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return false;
                }
                return true;
            }
        });
    }

    public String inventoryDeduct(String lockId) {
        //我这里是演示,实际对于不同的业务锁路径设置不同,比如支付和订单设置为"/root/pay/"和"/root/order/"
        String lockPath = "/root/frank/" + lockId;
        //调用加锁方法
        Integer result = zookeeperClient.lock(new AbstractLock<Integer>(lockPath, 10, TimeUnit.SECONDS) {
            @Override
            public Integer execute() {
                try {
                    //模拟业务处理
                    byte[] bytes = curatorFramework.getData().forPath(dataPath);
                    String data = new String(bytes);
                    int stock = Integer.parseInt(data);
                    if (stock > 0) {
                        //扣减库存
                        stock--;
                        curatorFramework.setData().forPath(dataPath, (stock + "").getBytes());
                    }
                    return stock;
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return null;
            }
        });
        if (result == null) {
            log.info("业务执行失败");
            return "业务执行失败";
        } else {
            log.info("执行成功,剩余库存:" + result);
            return "执行成功,剩余库存:" + result;
        }
    }

    /**
     * 获取分布式ID
     * @param path
     * @param data
     * @return
     * @throws Exception
     */
    public String getDistributedId(String path, String data) throws Exception {
        String seqNode = this.createTypeSeqNode(CreateMode.EPHEMERAL_SEQUENTIAL, path, data);
        System.out.println(seqNode);
        int index = seqNode.lastIndexOf(path);
        if (index >= 0) {
            index += path.length();
            return index <= seqNode.length() ? seqNode.substring(index) : "";
        }
        return seqNode;
    }

    /**
     * 创建指定类型的有序节点
     * @param nodeType
     * @param path
     * @param data
     * @return
     */
    public String createTypeSeqNode(CreateMode nodeType, String path, String data) throws Exception {
        String nodePath = curatorFramework.create().creatingParentsIfNeeded().withProtection().withMode(nodeType)
                .forPath(path, data.getBytes(StandardCharsets.UTF_8));
        return nodePath;
    }


}

package com.cxb.springboot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootHelloApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootHelloApplication.class, args);
    }

}


代码地址



版权声明:本文为qq_33371766原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。