1、添加相关依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.cxb</groupId>
<artifactId>springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-hello</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<slf4j.version>1.6.6</slf4j.version>
<log4j.version>1.2.12</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
<!-- mybatis -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.5</version>
</dependency>
<!-- 链接池 -->
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- log start -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.9</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
# zk配置
curator:
retryCount: 5 #重试次数
baseSleepTimeMs: 1000 #重试间隔时间
connectString: 127.0.0.1:2181 # zk地址
sessionTimeoutMs: 60000 #session超时时间
connectionTimeoutMs: 5000 #连接超时时间
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
data-username: root
data-password: 123456
url: jdbc:mysql:///activiti?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT
server:
port: 8081
zookeeper:
server: 127.0.0.1:2181
sleep-time: 1000
max-retries: 3
session-timeout: 1500
connection-timerout: 5000
2、编码实现
package com.cxb.springboot.common;
import java.util.concurrent.TimeUnit;
/**
* @Classname AbstractLock
* @Description TODO
* @Date 2023/3/28 22:41
* @Created by Administrator
*/
public abstract class AbstractLock<T> {
/**
* 锁路径
*/
protected String lockPath;
/**
* 超时时间
*/
protected long time;
protected TimeUnit timeUnit;
public AbstractLock(String lockPath, long time, TimeUnit timeUnit) {
this.lockPath = lockPath;
this.time = time;
this.timeUnit = timeUnit;
}
public void setLockPath(String lockPath) {
this.lockPath = lockPath;
}
public String getLockPath() {
return lockPath;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
/**
* 执行业务的方法
*
* @return
*/
public abstract T execute();
}
package com.cxb.springboot.common;
/**
* @Classname ZookeeperClient
* @Description TODO
* @Date 2023/3/28 22:38
* @Created by Administrator
*/
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
@Slf4j
public class ZookeeperClient {
private CuratorFramework curatorFramework;
public ZookeeperClient(CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework;
}
public <T> T lock(AbstractLock<T> abstractLock) {
//获取锁路径
String lockPath = abstractLock.getLockPath();
//创建InterProcessMutex实例
InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); //创建锁对象
boolean success = false;
try {
try {
//加锁
success = lock.acquire(abstractLock.getTime(), abstractLock.getTimeUnit()); //获取锁
} catch (Exception e) {
throw new RuntimeException("尝试获取锁异常:" + e.getMessage() + ", lockPath " + lockPath);
}
//判断是否加锁成功
if (success) {
return abstractLock.execute();
} else {
log.info("获取锁失败,返回null");
return null;
}
} finally {
try {
if (success) {
//释放锁
lock.release();
}
} catch (Exception e) {
log.error("释放锁异常: {}, lockPath {}", e.getMessage(), lockPath);
}
}
}
//bean的销毁方法
public void destroy() {
try {
log.info("ZookeeperClient销毁方法,如果zookeeper连接不为空,则关闭连接");
if (getCuratorFramework() != null) {
//这种方式比较优雅的关闭连接
getCuratorFramework().close();
}
} catch (Exception e) {
log.error("stop zookeeper client error {}", e.getMessage());
}
}
public CuratorFramework getCuratorFramework() {
return curatorFramework;
}
}
package com.cxb.springboot.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @Classname WrapperZk
* @Description TODO
* @Date 2023/3/28 21:57
* @Created by Administrator
*/
@Data
@Component
@ConfigurationProperties(prefix = "curator")
public class WrapperZk {
private int retryCount;
private int elapsedTimeMs;
private String connectString;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
private int baseSleepTimeMs;
}
package com.cxb.springboot.config;
import com.cxb.springboot.common.ZookeeperClient;
import com.cxb.springboot.listener.ZookeeperWatcherListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import static com.cxb.springboot.service.CuratorLockService.dataPath;
/**
* @Classname ZkConfiguration
* @Description TODO
* @Date 2023/3/28 21:57
* @Created by Administrator
*/
@Configuration
@Slf4j
public class ZkConfiguration {
@Resource
private WrapperZk wrapperZk;
/**
* 其中RetryPolicy为重试策略,第一个参数为baseSleepTimeMs初始的sleep时间,
* 用于计算之后的每次重试的sleep时间。第二个参数为maxRetries,最大重试次数。
* @return
*/
/*@Bean(initMethod = "start")
public CuratorFramework curatorFramework() {
RetryPolicy retrYPolicy = new ExponentialBackoffRetry(wrapperZk.getBaseSleepTimeMs(),
wrapperZk.getRetryCount());
CuratorFramework client = CuratorFrameworkFactory
.newClient(wrapperZk.getConnectString(), retrYPolicy);
log.info("zk curator初始化完成...");
return client;
}*/
@Resource
private ZookeeperProperties zookeeperProperties;
@Bean
public CuratorFramework curatorFrameworkClient() {
//重试策略,ExponentialBackoffRetry(1000,3)这里表示等待1s重试,最大重试次数为3次
RetryPolicy policy = new ExponentialBackoffRetry(zookeeperProperties.getSleepTime(),
zookeeperProperties.getMaxRetries());
//构建CuratorFramework实例
CuratorFramework curatorFrameworkClient = CuratorFrameworkFactory
.builder()
.connectString(zookeeperProperties.getServer())
.sessionTimeoutMs(zookeeperProperties.getSessionTimeout())
.connectionTimeoutMs(zookeeperProperties.getConnectionTimeout())
.retryPolicy(policy)
.build();
// 启动实例
curatorFrameworkClient.start();
CuratorCache curatorCache = CuratorCache.
build(curatorFrameworkClient, dataPath, CuratorCache.Options.SINGLE_NODE_CACHE);
CuratorCacheListener listener = CuratorCacheListener
.builder()
.forAll(new ZookeeperWatcherListener())
.build();
curatorCache.listenable().addListener(listener);
curatorCache.start();
return curatorFrameworkClient;
}
/**
* 采用这种方式注册bean可以比较优雅的关闭连接
*/
@Bean(destroyMethod = "destroy")
public ZookeeperClient zookeeperClient(CuratorFramework curatorFrameworkClient) {
return new ZookeeperClient(curatorFrameworkClient);
}
}
package com.cxb.springboot.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @Classname ZookeeperProperties
* @Description TODO
* @Date 2023/3/28 22:34
* @Created by Administrator
*/
@Data
@Component
@ConfigurationProperties(prefix = "zookeeper")
public class ZookeeperProperties {
/**
* zookeeper服务地址
*/
private String server;
/**
* 重试等待时间
*/
private int sleepTime;
/**
* 最大重试次数
*/
private int maxRetries;
/**
* session超时时间
*/
private int sessionTimeout;
/**
* 连接超时时间
*/
private int connectionTimeout;
}
package com.cxb.springboot.controller;
import com.cxb.springboot.service.CuratorLockService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Classname CuratorController
* @Description TODO
* @Date 2023/3/28 22:43
* @Created by Administrator
*/
@RequestMapping("/curator")
@RestController
public class CuratorController {
@Autowired
private CuratorLockService curatorLockService;
/**
* localhost:8081/curator/deduct?lockId=1001
* @param lockId
* @return
*/
@RequestMapping("/deduct")
public String deduct(String lockId) {
return curatorLockService.inventoryDeduct(lockId);
}
}
package com.cxb.springboot.controller;
import com.cxb.springboot.config.ZkConfiguration;
import com.cxb.springboot.service.CuratorLockService;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* @Classname MicroController
* @Description TODO
* @Date 2023/3/28 22:03
* @Created by Administrator
*/
@Controller
@RequestMapping("/zk")
@Slf4j
public class MicroController {
private final static String PATH = "/rootLock";
private final static String ID_PATH = "/root/rootId";
@Autowired
ZkConfiguration zkConfiguration;
@Autowired
private CuratorLockService curatorLockService;
/**
* 这里我用的是最常用的可重入排他锁,也是公平锁(InterProcessMutex)
* InterProcessMutex:分布式可重入排它锁
* InterProcessSemaphoreMutex:分布式排它锁
* InterProcessReadWriteLock:分布式读写锁
* InterProcessMultiLock:将多个锁作为单个实体管理的容器
* @return
* @throws Exception
*/
@GetMapping("/lock")
@ResponseBody
public String getLock1() throws Exception {
InterProcessMutex lock = new InterProcessMutex(zkConfiguration.curatorFrameworkClient(), PATH);
for (int i = 0; i < 30; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
log.info(Thread.currentThread().getName() + "尝试获取锁....");
lock.acquire();
log.info(Thread.currentThread().getName() + "获取锁成功....");
log.info(Thread.currentThread().getName() + "开始执行业务逻辑....");
Thread.sleep(10000);
lock.release();
log.info(Thread.currentThread().getName() + "释放锁成功....");
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();
}
return "execute success";
}
@GetMapping("/id")
@ResponseBody
public String createId(String data) throws Exception {
return curatorLockService.getDistributedId(ID_PATH, data);
}
}
节点数据变化监听器
package com.cxb.springboot.listener;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
/**
* @Classname ZookeeperWatcherListener
* @Description TODO
* @Date 2023/3/28 23:11
* @Created by Administrator
*/
public class ZookeeperWatcherListener implements CuratorCacheListener {
@Override
public void event(Type type, ChildData oldData, ChildData data) {
System.out.println("事件类型: " + type + " :oldData: " + new String(oldData.getData()) + " :data: " + new String(data.getData()));
}
}
package com.cxb.springboot.service;
/**
* @Classname CuratorLockService
* @Description TODO
* @Date 2023/3/28 22:41
* @Created by Administrator
*/
import com.cxb.springboot.common.AbstractLock;
import com.cxb.springboot.common.ZookeeperClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class CuratorLockService {
@Autowired
private ZookeeperClient zookeeperClient;
@Autowired
private CuratorFramework curatorFramework;
//库存存取的路径
public static final String dataPath = "/root/data/stock";
//初始化库存的路径
public static final String initPath = "/root/init/stock";
/**
* 此方法系统启动执行,使用zookeeper存一个库存用于测试,这里也使用了锁。(只是一个模拟初始化库存的方法)
*/
@PostConstruct
public void init() {
zookeeperClient.lock(new AbstractLock<Boolean>(initPath, 20, TimeUnit.SECONDS) {
@Override
public Boolean execute() {
try {
//判断是否存在路径
Stat stat = curatorFramework.checkExists().forPath(dataPath);
if (stat == null) {
//为空则不存在,则创建并设置库存值
curatorFramework.create().creatingParentContainersIfNeeded().forPath(dataPath, "1000".getBytes());
log.info("初始化数据完成");
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
});
}
public String inventoryDeduct(String lockId) {
//我这里是演示,实际对于不同的业务锁路径设置不同,比如支付和订单设置为"/root/pay/"和"/root/order/"
String lockPath = "/root/frank/" + lockId;
//调用加锁方法
Integer result = zookeeperClient.lock(new AbstractLock<Integer>(lockPath, 10, TimeUnit.SECONDS) {
@Override
public Integer execute() {
try {
//模拟业务处理
byte[] bytes = curatorFramework.getData().forPath(dataPath);
String data = new String(bytes);
int stock = Integer.parseInt(data);
if (stock > 0) {
//扣减库存
stock--;
curatorFramework.setData().forPath(dataPath, (stock + "").getBytes());
}
return stock;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
});
if (result == null) {
log.info("业务执行失败");
return "业务执行失败";
} else {
log.info("执行成功,剩余库存:" + result);
return "执行成功,剩余库存:" + result;
}
}
/**
* 获取分布式ID
* @param path
* @param data
* @return
* @throws Exception
*/
public String getDistributedId(String path, String data) throws Exception {
String seqNode = this.createTypeSeqNode(CreateMode.EPHEMERAL_SEQUENTIAL, path, data);
System.out.println(seqNode);
int index = seqNode.lastIndexOf(path);
if (index >= 0) {
index += path.length();
return index <= seqNode.length() ? seqNode.substring(index) : "";
}
return seqNode;
}
/**
* 创建指定类型的有序节点
* @param nodeType
* @param path
* @param data
* @return
*/
public String createTypeSeqNode(CreateMode nodeType, String path, String data) throws Exception {
String nodePath = curatorFramework.create().creatingParentsIfNeeded().withProtection().withMode(nodeType)
.forPath(path, data.getBytes(StandardCharsets.UTF_8));
return nodePath;
}
}
package com.cxb.springboot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringbootHelloApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootHelloApplication.class, args);
}
}
版权声明:本文为qq_33371766原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。