基于redis的分布式锁的Java实现

  • Post author:
  • Post category:java


关于分布式锁到底是个什么东西,我在这里就不多说了。

本文的分布式锁主要的构成元素有:ip,线程id,加锁次数。

ip:在集群环境下,部署在不同机器上的应用可能争抢一把锁,用ip实现互斥;

线程id:同一个应用下,不同的线程也可能争抢一把锁;

加锁次数:让锁对同一个线程是可重入的。每加一次锁,当前加锁次数加一,每释放一次锁,当前加锁次数减一。当加锁次数为0时,锁完全释放。



1、普通锁



1.1 单线程加锁解锁

		//testlock为存在redis中的key,这个key存在表示,锁存在
		LockUtil lock = new LockUtil("testlock");
		System.out.println(lock.getLockInfo());//null
		lock.tryLock();
		System.out.println(lock.getLockInfo());//{"ip":"169.254.43.193","threadId":"1","count":1}
		lock.unLock();
		System.out.println(lock.getLockInfo());//null



1.2 多个线程抢锁

让多个线程抢锁,抢到锁的线程休息2秒,然后释放锁。

理论上,只有一个线程获取到锁。

		for(int i=1;i<=5;i++) {
			Thread t= new Thread(new Runnable() {
				@Override
				public void run() {
					LockUtil lock = new LockUtil("testlock");
					if(lock.tryLock()) {
						System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁成功");
						try {
							Thread.currentThread().sleep(2000);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						lock.unLock();
						System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",释放锁");
					}else {
						System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁失败");
					}
				}
			});
			t.start();
		}

这里是打印结果,符合预期。

11563623233705线程13,获取锁失败

1563623233705线程11,获取锁失败

1563623233705线程14,获取锁失败

1563623233705线程15,获取锁失败

1563623233706线程12,获取锁成功

1563623235722线程12,释放锁



2、超时锁



2.1 多个线程抢锁

设置抢锁超时时间为3秒,抢到锁的沉睡2秒。

理论上,只有两个线程抢到锁,抢到锁的时间间隔为2秒,其他线程超时。

		for(int i=1;i<=5;i++) {
			Thread t= new Thread(new Runnable() {
				@Override
				public void run() {
					LockUtil lock = new LockUtil("testlock");
					if(lock.tryLock(3)) {  //设置超时时间为3秒
						System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁成功");
						try {
							Thread.currentThread().sleep(2000);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						lock.unLock();
						System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",释放锁");
					}else {
						System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁失败");
					}
				}
			});
			t.start();
		}

这里是打印结果,符合预期。

1563624210126线程15,获取锁成功

1563624212130线程15,释放锁

1563624212137线程11,获取锁成功

1563624212602线程12,获取锁失败

1563624212609线程14,获取锁失败

1563624212610线程13,获取锁失败

1563624214142线程11,释放锁



3、乐观锁

理论上都可以获得锁

		for(int i=1;i<=5;i++) {
			Thread t= new Thread(new Runnable() {
				@Override
				public void run() {
					LockUtil lock = new LockUtil("testlock");
					lock.setSpin(true);//开启乐观锁,在LockUtil中设置了获取锁次数最大为100,可以自行调节或着设置为无限循环
					if(lock.tryLock()) {
						System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁成功");
						lock.unLock();
						System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",释放锁");
					}else {
						System.out.println(System.currentTimeMillis()+"线程"+Thread.currentThread().getId()+",获取锁失败");
					}
				}
			});
			t.start();
		}

符合预期。

1563639403104线程15,获取锁成功

1563639403118线程15,释放锁

1563639403135线程14,获取锁成功

1563639403143线程14,释放锁

1563639403151线程13,获取锁成功

1563639403155线程13,释放锁

1563639403167线程11,获取锁成功

1563639403172线程11,释放锁

1563639403196线程12,获取锁成功

1563639403200线程12,释放锁



java实现分布式锁代码

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;

import net.sf.json.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Response;
import redis.clients.jedis.Transaction;

public class LockUtil {
	
//	private RedisUtil redisUtil=new RedisUtil();
	
	Jedis jedis = new Jedis("localhost");
	
	private String lockKey;
	
	private boolean spin=false;
	
	private int maxSpinCount=100;
	
	private int outTime=5;//锁过期时间,默认为5秒
	
	private final int threadSleepTimeMills=10;//10毫秒
	
	public LockUtil(String lockKey){
		this.lockKey=lockKey;
	}
	
	@SuppressWarnings("static-access")
	public boolean tryLock(){
		boolean flag=false;
		if(spin){
			for(int i=1;i<=maxSpinCount;i++){
				flag=lock();
				if(flag){
					break;
				}
				try {
					Thread.currentThread().sleep(threadSleepTimeMills);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}else{
			flag=lock();
		}
		return flag;
	}
	
	/**
	 * 
	 * @param outTime
	 * @return
	 */
	@SuppressWarnings("static-access")
	public boolean tryLock(final int outTime){
		final StopFlag stopFlag=new StopFlag();
		boolean flag=false;
			Thread stopThread=new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.currentThread().sleep(outTime*1000);
						stopFlag.stop(true);
					} catch (InterruptedException e) {
					}
				}
			});
			stopThread.setDaemon(true);//请思考这样设置的意义
			stopThread.start();
			while(!stopFlag.isStop()){
				flag=lock();
				if(flag){
					stopThread.interrupt();
					break;
				}else{
					try {
						Thread.currentThread().sleep(threadSleepTimeMills);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		return flag;
	}
	
	class StopFlag{
		
		private boolean stopFlag=false;
		
		public boolean isStop() {
			return stopFlag;
		}
		
		public void stop(boolean stopFlag) {
			this.stopFlag= stopFlag;
		}
	}
	
	private boolean lock() {
		boolean flag=false;
//		Jedis jedis=redisUtil.getRedis();
		JSONObject lockJson=createLock();
		Transaction tx=jedis.multi();
		tx.setnx(lockKey, lockJson.toString());
		tx.expire(lockKey,outTime );
		List<Response<?>> res=tx.execGetResponse();
		if(res != null && res.size()>0){
			Response<?>re=res.get(0);
			if(re.get().toString().equals("1") || re.get().toString().equals("OK")){
				flag=true;
				String str=jedis.get(lockKey); 
				lockJson=JSONObject.fromObject(str);
				long count=lockJson.getLong("count");
				lockJson.put("count", ++count);
				jedis.set(lockKey, lockJson.toString());
			}else{
				flag = false;
				String str=jedis.get(lockKey);
				if(str == null) {
					return false;
				}
				lockJson=JSONObject.fromObject(str); 
				String ip=getIp(); 
				String threadId=Thread.currentThread().getId()+"";
				if(lockJson.getString("ip") != null && lockJson.getString("ip").equals(ip)){
					if(lockJson.get("threadId") != null &&
							lockJson.getString("threadId").equals(threadId)){ 
						long count=lockJson.getLong("count"); 
						lockJson.put("count", ++count);
						tx=jedis.multi(); 
						tx.set(lockKey, lockJson.toString());
						tx.expire(lockKey,outTime ); 
						res=tx.execGetResponse(); 
						if(res != null && res.size()>0){ 
							re=res.get(0); 
							if(re.get().toString().equals("1") ||
									re.get().toString().equals("OK")){ 
								flag=true; 
							} 
						} 
					} 
				}
				 
			}
		}
		if(flag){
//			System.out.println("锁+1:"+getLockInfo());
		}
		jedis.close();
		return flag;
	}

	public void unLock(){
//		Jedis jedis=redisUtil.getRedis();
		String str=jedis.get(lockKey);
		if(str != null){
			JSONObject lockJson=JSONObject.fromObject(str);
			String ip=getIp();
			String threadId=Thread.currentThread().getId()+"";
			if(lockJson.getString("ip") != null && lockJson.getString("ip").equals(ip)){
				if(lockJson.get("threadId") != null && lockJson.getString("threadId").equals(threadId)){
					int count=lockJson.getInt("count");
					count--;
					if(count<0){
						count=0;
					}
					if(count==0){
						jedis.del(lockKey);
//						System.out.println("锁释放");
					}else{
						lockJson.put("count", count);
						Transaction tx=jedis.multi();
						tx=jedis.multi();
						tx.set(lockKey, lockJson.toString());
						tx.expire(lockKey,outTime );
					}
				}
			}
		}else{
			//锁还没有释放,就失效了
		}
		jedis.close();
	}
	
	private JSONObject createLock() {
		JSONObject lockJson0=new JSONObject();
		String ip=getIp();
		String threadId=Thread.currentThread().getId()+"";
		long count=0;
		lockJson0.put("ip", ip);
		lockJson0.put("threadId", threadId);
		lockJson0.put("count", count);
//		lockJson.put("lockKey", lockKey);
//		lockJson.put("createTime", System.currentTimeMillis());
		return lockJson0;
	}
	
	private String getIp() {
		String ip="";
		try {
			InetAddress ia=InetAddress.getLocalHost();
			ip=ia.getHostAddress();
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
		return ip;
	}
	
	public String getLockInfo(){
//		Jedis jedis=redisUtil.getRedis();
		String info=jedis.get(lockKey);
		jedis.close();
		return info;
	}
	
	public int getOutTime() {
		return outTime;
	}

	public void setOutTime(int outTime) {
		this.outTime = outTime;
	}
	
	public void setSpin(boolean spin) {
		this.spin = spin;
	}
	
	public boolean getSpin() {
		return this.spin;
	}
}



分布式锁的缺陷及思考

redis设置锁过期时间用多久合适?

首先,设置永久肯定是不行的,因为集群环境下,一台服务器抢到了锁,然后没来得及释放就崩了,其他服务器就永远抢不到锁了。

设置短了也不行,比如,你设置了3秒,一个线程抢到锁后执行任务的时间超过了3秒,但此时锁已经过期了,其他线程就进来了。这种情况该怎么办???

设置长了,比如10分钟,这回造成另外一个问题,一台ip为1.2.3.4机器上的id为10的线程抢到了锁,然后服务崩了,接着又重启了,重启之后又这台机器上的id为10的线程抢到锁了,这样也是不对的。虽然重启前后线程id都是10,却不是同一个线程了。

那么,这种情况又该怎么解决???

解决方案我先不说,留给大家进行思考思考。



版权声明:本文为m0_37133375原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。