关于分布式锁到底是个什么东西,我在这里就不多说了。
本文的分布式锁主要的构成元素有:ip,线程id,加锁次数。
ip:在集群环境下,部署在不同机器上的应用可能争抢一把锁,用ip实现互斥;
线程id:同一个应用下,不同的线程也可能争抢一把锁;
加锁次数:让锁对同一个线程是可重入的。每加一次锁,当前加锁次数加一,每释放一次锁,当前加锁次数减一。当加锁次数为0时,锁完全释放。
1、普通锁
1.1 单线程加锁解锁
//testlock为存在redis中的key,这个key存在表示,锁存在
LockUtil lock = new LockUtil("testlock");
System.out.println(lock.getLockInfo());//null
lock.tryLock();
System.out.println(lock.getLockInfo());//{"ip":"169.254.43.193","threadId":"1","count":1}
lock.unLock();
System.out.println(lock.getLockInfo());//null
1.2 多个线程抢锁
让多个线程抢锁,抢到锁的线程休息2秒,然后释放锁。
理论上,只有一个线程获取到锁。
for(int i=1;i<=5;i++) {
Thread t= new Thread(new Runnable() {
@Override
public void run() {
LockUtil lock = new LockUtil("testlock");
if(lock.tryLock()) {
System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁成功");
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unLock();
System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",释放锁");
}else {
System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁失败");
}
}
});
t.start();
}
这里是打印结果,符合预期。
11563623233705线程13,获取锁失败
1563623233705线程11,获取锁失败
1563623233705线程14,获取锁失败
1563623233705线程15,获取锁失败
1563623233706线程12,获取锁成功
1563623235722线程12,释放锁
2、超时锁
2.1 多个线程抢锁
设置抢锁超时时间为3秒,抢到锁的沉睡2秒。
理论上,只有两个线程抢到锁,抢到锁的时间间隔为2秒,其他线程超时。
for(int i=1;i<=5;i++) {
Thread t= new Thread(new Runnable() {
@Override
public void run() {
LockUtil lock = new LockUtil("testlock");
if(lock.tryLock(3)) { //设置超时时间为3秒
System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁成功");
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unLock();
System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",释放锁");
}else {
System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁失败");
}
}
});
t.start();
}
这里是打印结果,符合预期。
1563624210126线程15,获取锁成功
1563624212130线程15,释放锁
1563624212137线程11,获取锁成功
1563624212602线程12,获取锁失败
1563624212609线程14,获取锁失败
1563624212610线程13,获取锁失败
1563624214142线程11,释放锁
3、乐观锁
理论上都可以获得锁
for(int i=1;i<=5;i++) {
Thread t= new Thread(new Runnable() {
@Override
public void run() {
LockUtil lock = new LockUtil("testlock");
lock.setSpin(true);//开启乐观锁,在LockUtil中设置了获取锁次数最大为100,可以自行调节或着设置为无限循环
if(lock.tryLock()) {
System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁成功");
lock.unLock();
System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",释放锁");
}else {
System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁失败");
}
}
});
t.start();
}
符合预期。
1563639403104线程15,获取锁成功
1563639403118线程15,释放锁
1563639403135线程14,获取锁成功
1563639403143线程14,释放锁
1563639403151线程13,获取锁成功
1563639403155线程13,释放锁
1563639403167线程11,获取锁成功
1563639403172线程11,释放锁
1563639403196线程12,获取锁成功
1563639403200线程12,释放锁
java实现分布式锁代码
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import net.sf.json.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Response;
import redis.clients.jedis.Transaction;
public class LockUtil {
// private RedisUtil redisUtil=new RedisUtil();
Jedis jedis = new Jedis("localhost");
private String lockKey;
private boolean spin=false;
private int maxSpinCount=100;
private int outTime=5;//锁过期时间,默认为5秒
private final int threadSleepTimeMills=10;//10毫秒
public LockUtil(String lockKey){
this.lockKey=lockKey;
}
@SuppressWarnings("static-access")
public boolean tryLock(){
boolean flag=false;
if(spin){
for(int i=1;i<=maxSpinCount;i++){
flag=lock();
if(flag){
break;
}
try {
Thread.currentThread().sleep(threadSleepTimeMills);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}else{
flag=lock();
}
return flag;
}
/**
*
* @param outTime
* @return
*/
@SuppressWarnings("static-access")
public boolean tryLock(final int outTime){
final StopFlag stopFlag=new StopFlag();
boolean flag=false;
Thread stopThread=new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.currentThread().sleep(outTime*1000);
stopFlag.stop(true);
} catch (InterruptedException e) {
}
}
});
stopThread.setDaemon(true);//请思考这样设置的意义
stopThread.start();
while(!stopFlag.isStop()){
flag=lock();
if(flag){
stopThread.interrupt();
break;
}else{
try {
Thread.currentThread().sleep(threadSleepTimeMills);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return flag;
}
class StopFlag{
private boolean stopFlag=false;
public boolean isStop() {
return stopFlag;
}
public void stop(boolean stopFlag) {
this.stopFlag= stopFlag;
}
}
private boolean lock() {
boolean flag=false;
// Jedis jedis=redisUtil.getRedis();
JSONObject lockJson=createLock();
Transaction tx=jedis.multi();
tx.setnx(lockKey, lockJson.toString());
tx.expire(lockKey,outTime );
List<Response<?>> res=tx.execGetResponse();
if(res != null && res.size()>0){
Response<?>re=res.get(0);
if(re.get().toString().equals("1") || re.get().toString().equals("OK")){
flag=true;
String str=jedis.get(lockKey);
lockJson=JSONObject.fromObject(str);
long count=lockJson.getLong("count");
lockJson.put("count", ++count);
jedis.set(lockKey, lockJson.toString());
}else{
flag = false;
String str=jedis.get(lockKey);
if(str == null) {
return false;
}
lockJson=JSONObject.fromObject(str);
String ip=getIp();
String threadId=Thread.currentThread().getId()+"";
if(lockJson.getString("ip") != null && lockJson.getString("ip").equals(ip)){
if(lockJson.get("threadId") != null &&
lockJson.getString("threadId").equals(threadId)){
long count=lockJson.getLong("count");
lockJson.put("count", ++count);
tx=jedis.multi();
tx.set(lockKey, lockJson.toString());
tx.expire(lockKey,outTime );
res=tx.execGetResponse();
if(res != null && res.size()>0){
re=res.get(0);
if(re.get().toString().equals("1") ||
re.get().toString().equals("OK")){
flag=true;
}
}
}
}
}
}
if(flag){
// System.out.println("锁+1:"+getLockInfo());
}
jedis.close();
return flag;
}
public void unLock(){
// Jedis jedis=redisUtil.getRedis();
String str=jedis.get(lockKey);
if(str != null){
JSONObject lockJson=JSONObject.fromObject(str);
String ip=getIp();
String threadId=Thread.currentThread().getId()+"";
if(lockJson.getString("ip") != null && lockJson.getString("ip").equals(ip)){
if(lockJson.get("threadId") != null && lockJson.getString("threadId").equals(threadId)){
int count=lockJson.getInt("count");
count--;
if(count<0){
count=0;
}
if(count==0){
jedis.del(lockKey);
// System.out.println("锁释放");
}else{
lockJson.put("count", count);
Transaction tx=jedis.multi();
tx=jedis.multi();
tx.set(lockKey, lockJson.toString());
tx.expire(lockKey,outTime );
}
}
}
}else{
//锁还没有释放,就失效了
}
jedis.close();
}
private JSONObject createLock() {
JSONObject lockJson0=new JSONObject();
String ip=getIp();
String threadId=Thread.currentThread().getId()+"";
long count=0;
lockJson0.put("ip", ip);
lockJson0.put("threadId", threadId);
lockJson0.put("count", count);
// lockJson.put("lockKey", lockKey);
// lockJson.put("createTime", System.currentTimeMillis());
return lockJson0;
}
private String getIp() {
String ip="";
try {
InetAddress ia=InetAddress.getLocalHost();
ip=ia.getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
return ip;
}
public String getLockInfo(){
// Jedis jedis=redisUtil.getRedis();
String info=jedis.get(lockKey);
jedis.close();
return info;
}
public int getOutTime() {
return outTime;
}
public void setOutTime(int outTime) {
this.outTime = outTime;
}
public void setSpin(boolean spin) {
this.spin = spin;
}
public boolean getSpin() {
return this.spin;
}
}
分布式锁的缺陷及思考
redis设置锁过期时间用多久合适?
首先,设置永久肯定是不行的,因为集群环境下,一台服务器抢到了锁,然后没来得及释放就崩了,其他服务器就永远抢不到锁了。
设置短了也不行,比如,你设置了3秒,一个线程抢到锁后执行任务的时间超过了3秒,但此时锁已经过期了,其他线程就进来了。这种情况该怎么办???
设置长了,比如10分钟,这回造成另外一个问题,一台ip为1.2.3.4机器上的id为10的线程抢到了锁,然后服务崩了,接着又重启了,重启之后又这台机器上的id为10的线程抢到锁了,这样也是不对的。虽然重启前后线程id都是10,却不是同一个线程了。
那么,这种情况又该怎么解决???
解决方案我先不说,留给大家进行思考思考。