背景
今天做需求时遇到一个统计场景,接口将用户请求记录缓存在concurrentHashmap,其中用户名作为map的Key,value为统计结果类的对象,更新此map的时候使用分段锁(通过用户名取hash值定位对应的锁)确保在相对良好性能下使得value更新线程安全。此外通过定时任务2秒一次将缓存的map保存到redis数据库后再清空map,这意味着定时任务执行的某个时候需要暂停所有写入map操作,由于map写入是使用分段锁,意味着需要阻塞所有获取分段锁线程,类似于mysql某些修改表结构的时候会阻塞行锁一样。查阅资料后发现java并无相关实现,这里手动实现一个。
分段独占锁
- 首先实现分段独占锁,在表-行锁中充当行锁的功能
- 代码如下,通过对key取hash值来定位具体的某行(锁)
public class SegReentrantLock{
private List<Lock> lockList;
public SegReentrantLock() {
this(10);
}
public SegReentrantLock(int size) {
if (size < 1){
throw new IllegalArgumentException("size must great than 0");
}
this.lockList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
this.lockList.add( new ReentrantLock() );
}
}
public void lock(String key){
int lockIndex = this.getLockIndex(key);
this.lockList.get(lockIndex).lock();
}
public void unlock(String key){
int lockIndex = this.getLockIndex(key);
this.lockList.get(lockIndex).unlock();
}
private int getLockIndex(String key){
int lockIndex = key.hashCode() % this.lockList.size();
return Math.abs(lockIndex);
}
public boolean tryLock(String key, long time, TimeUnit timeUnit) throws InterruptedException {
int lockIndex = key.hashCode() % this.lockList.size();
if (time < 1 || timeUnit == null){
return this.lockList.get(lockIndex).tryLock();
}
return this.lockList.get(lockIndex).tryLock(time, timeUnit);
}
public boolean tryLock(String key) throws InterruptedException {
return this.tryLock(key, 0, TimeUnit.SECONDS);
}
}
行表锁
- 在实现了分段锁的基础上,通过jdk自带StampedLock来模拟表锁。
- 获取行锁的时候,先获取StampedLock的读锁,再获取行锁,释放同理
- 获取表锁的时候,直接获取StampedLock的写锁即可,如对性能有高要求,需减少获取表锁的饥饿现象,可通过StampedLock的tryOptimisticRead方法将加锁逻辑修改为“读的过程中也允许获取写锁后写入”的模式。
- 行表锁代码如下
public class TableRowsLock {
//表锁
private final StampedLock STAMPED_LOCK = new StampedLock();
//行锁
private SegReentrantLock rowLocks;
public TableRowsLock() {
this(10);
}
public TableRowsLock(int rows) {
rowLocks = new SegReentrantLock(rows);
}
/**
* @description:释放表锁
* @param time
* @param timeUnit
* @return long 时间戳,为0表示加锁失败
*/
public long tryLockTable(int time, TimeUnit timeUnit) throws InterruptedException {
return STAMPED_LOCK.tryWriteLock(time, timeUnit);
}
public long tryLockTable() throws InterruptedException {
return STAMPED_LOCK.tryWriteLock(0, TimeUnit.SECONDS);
}
/**
* @description:释放表锁
* @param stamp 获取表锁返回的时间戳
* @see #tryLockTable(int, TimeUnit)
* @return void
*/
public void unLockTable(long stamp){
STAMPED_LOCK.unlockWrite(stamp);
}
/**
* @description:加行锁
* @param key
* @param time
* @param timeUnit
* @return long 时间戳,为0表示加锁失败
*/
public long tryLockRow(String key, int time, TimeUnit timeUnit) throws InterruptedException {
//先设置表锁为已读状态
long stamp = STAMPED_LOCK.tryReadLock(time, timeUnit);
//失败,直接返回
if (stamp == 0) return stamp;
//锁行
boolean lockedRow = rowLocks.tryLock(key, time, timeUnit);
if (!lockedRow){
STAMPED_LOCK.unlockRead(stamp);
return 0;
}
return stamp;
}
public long tryLockRow(String key) throws InterruptedException {
return tryLockRow(key, 0, TimeUnit.SECONDS);
}
/**
* @description:释放行锁
* @param key
* @param stamp 获取行锁获得的时间戳
* @see #tryLockRow(String, int, TimeUnit)
*/
public void unlockRow(String key, long stamp){
STAMPED_LOCK.unlockRead(stamp);
rowLocks.unlock(key);
}
}
测试代码
public static void main(String[] args) {
//测试行锁释放和未释放情况下获取表锁
testBlockingTableLock();
//测试表锁释放和未释放情况下获取行锁
// testBlockingRowLock();
}
//测试行锁释放和未释放情况下获取表锁
private static void testBlockingTableLock(){
TableRowsLock tableLock = new TableRowsLock(2);
new Thread(() -> {
try {
long stamp1 = tableLock.tryLockRow("1");
System.out.println("未加表锁与行锁时获取行锁1,结果:" + stamp1);
long stamp2 = tableLock.tryLockRow("2");
System.out.println("未加表锁只加行锁时获取行锁2,结果:" + stamp2);
TimeUnit.SECONDS.sleep(2);
//释放行锁
tableLock.unlockRow("1", stamp1);
tableLock.unlockRow("2", stamp2);
} catch (Exception ex){
ex.printStackTrace();
}
}).start();
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
//获取表锁
long stamp = tableLock.tryLockTable();
System.out.println("行锁未释放,获取表锁,结果:" + stamp);
TimeUnit.SECONDS.sleep(3);
stamp = tableLock.tryLockTable();
System.out.println("行锁全部释放后 再次获取表锁,结果:" + stamp);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
//测试表锁释放和未释放情况下获取行锁
private static void testBlockingRowLock(){
TableRowsLock tableLock = new TableRowsLock(2);
new Thread(() -> {
try {
long stamp = tableLock.tryLockTable();
System.out.println("未加行锁时获取表锁,结果:" + stamp);
TimeUnit.SECONDS.sleep(2);
//释放表锁
tableLock.unLockTable(stamp);
} catch (Exception ex){
ex.printStackTrace();
}
}).start();
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
//获取行锁
long stamp = tableLock.tryLockRow("1");
System.out.println("表锁未释放,获取行锁,结果:" + stamp);
//表锁释放后获取行锁
TimeUnit.SECONDS.sleep(3);
stamp = tableLock.tryLockRow("1");
System.out.println("表锁释放后,再次获取行锁,结果:" + stamp);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
版权声明:本文为qq_41633199原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。