Java Concurrency代码实例之四-锁

  • Post author:
  • Post category:java


本文的读者应该是已经掌握了基本的Java多线程开发技巧,但不熟悉Java Concurrency包的程序员。本文是本系列的第四篇文章,前三篇文章请看这里:


https://zhuanlan.zhihu.com/p/26724352



https://zhuanlan.zhihu.com/p/27148381



https://zhuanlan.zhihu.com/p/27338395

1. 前言

按照用途与特性,Concurrency包中包含的工具被分为六类(外加一个工具类TimeUnit),即:

1. 执行者与线程池

2. 并发队列

3. 同步工具

4. 并发集合

5. 锁

6. 原子变量

本文介绍的是其中的锁,与synchronize关键字不同,并发包中的锁,其阻塞机制是调用Unsafe类提供的park和unpark方法,结合CAS原子语句,实现了比synchronize更加高效灵活的各种锁。

2. Unsafe类的park和unpark

在前文(

https://zhuanlan.zhihu.com/p/27338395

)中曾经详细介绍过Unsafe类的指针和CAS操作,这里再介绍一下它的park和unpark操作。

public native void park(boolean var1, long var2);
public native void unpark(Object var1);

park方法用来阻塞一个线程,第一个参数用来指示后面的参数是绝对时间还是相对时间,true表示绝对时间,false表示从此刻开始后的相对时间。调用park的线程就阻塞在此处。

upark用来释放某个线程的阻塞,线程用参数var1表示。例子如下:

public class UnsafeParkExam {
    public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
        Field f = Unsafe.class.getDeclaredField("theUnsafe");
        f.setAccessible(true);
        Unsafe unsafe = (Unsafe) f.get(null);
        Thread t1 = new Thread() {
            @Override
            public void run() {
                Thread.currentThread().setName("t1");
                System.out.println(Thread.currentThread().getName() + " before park");
                //park 100 seconds
                unsafe.park(false, TimeUnit.NANOSECONDS.convert(100, TimeUnit.SECONDS));
                System.out.println(Thread.currentThread().getName() + " after park");
            }
        };
        Thread t2 = new Thread() {
            @Override
            public void run() {
                try {
                    Thread.currentThread().setName("t2");
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName() + " unpark t1");
                    unsafe.unpark(t1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        Thread t3 = new Thread() {
            @Override
            public void run() {
                Thread.currentThread().setName("t3");
                System.out.println(Thread.currentThread().getName() + " park 5 seconds");
                //park 5 seconds
                unsafe.park(true, System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(5,TimeUnit.SECONDS));
                System.out.println(Thread.currentThread().getName() + " after park");
            }
        };
        t1.start();
        t2.start();
        t3.start();
        try {
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3. LockSupport

直接使用Unsafe还是有诸多不便之处,因此lock包提供了一个辅助类LockSupport来封装了park和unpark,例子如下:

public class LockSupportExam {
    public static void main(String[] args) {
        Thread t1 = new Thread() {
            @Override
            public void run() {
                Thread.currentThread().setName("t1");
                System.out.println(Thread.currentThread().getName() + " before park");
                //park 100 seconds
                LockSupport.parkNanos(TimeUnit.NANOSECONDS.convert(100, TimeUnit.SECONDS));
                System.out.println(Thread.currentThread().getName() + " after park");
            }
        };
        Thread t2 = new Thread() {
            @Override
            public void run() {
                try {
                    Thread.currentThread().setName("t2");
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName() + " unpark t1");
                    LockSupport.unpark(t1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        Thread t3 = new Thread() {
            @Override
            public void run() {
                Thread.currentThread().setName("t3");
                System.out.println(Thread.currentThread().getName() + " park 5 seconds");
                //park 5 seconds
                LockSupport.parkUntil(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(5,TimeUnit.SECONDS));
                System.out.println(Thread.currentThread().getName() + " after park");
            }
        };
        t1.start();
        t2.start();
        t3.start();
        try {
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

从例子中可以看出,使用LockSupport要比直接只用Unsafe更加便捷。除此之外,LockSupport还可以用来给线程设置一个Blocker对象,便于调试和检测线程,其原理是使用Unsafe的putObject方法直接设置Thread对象的parkBlocker属性,并在合适的时候读取这个Blocker对象,例子如下:

public class LockSupportBlockerExam {
    public static void main(String[] args) {
        Thread t3 = new Thread() {
            @Override
            public void run() {
                Thread.currentThread().setName("t3");
                System.out.println(Thread.currentThread().getName() + " park 5 seconds");
                //park 5 seconds, set blocker
                Object blocker = new String("Alex Wang");
                LockSupport.parkUntil(blocker, System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS));
                System.out.println(Thread.currentThread().getName() + " after park");
            }
        };
        t3.start();

        try {
            Object t3_blocker = null;
            while (t3_blocker == null) {
                t3_blocker = LockSupport.getBlocker(t3);
                TimeUnit.MILLISECONDS.sleep(10);
            }
            System.out.println("t3 blocker is :" + t3_blocker);
            t3.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

4. 锁的核心AQS同步器

毫不夸张的说,各种锁ReentrantLock、ReentrantReadWriteLock以及各种同步器诸如Semaphore、CountDownLatch等等,其核心都是AbstractQueuedSynchronizer(AQS同步器)。要了解AQS的原理,仅仅看代码和JDK文档是不够的,其原理在这一篇论文(

http://gee.cs.oswego.edu/dl/papers/aqs.pdf

)中详细阐述了,强烈建议仔细阅读此论文。

4.1 使用AQS写一个排它锁

由于AQS非常复杂,从源代码入手很难看懂,最好的学习方法是看它是如何被使用的。这里有一个非常简单的例子SimpleLock,实现了一个最简单的排它锁。当有线程获得锁时,其他线程只能等待;当这个线程释放锁时,另一个线程可以竞争获取锁。

public class SimpleLock {
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int ignore) {
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease(int ignore) {
            setState(0);
            return true;
        }

        protected Sync() {
            super();
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    private static class MyThread extends Thread {
        private final String name;
        private final SimpleLock lock;

        private MyThread(String name, SimpleLock lock) {
            this.name = name;
            this.lock = lock;
        }

        @Override
        public void run() {
            try {
                lock.lock();
                System.out.println(name + " get the lock");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println(name + " release the lock");
            }
        }
    }

    public static void main(String[] args) {
        final SimpleLock mutex = new SimpleLock();
        MyThread t1 = new MyThread("t1", mutex);
        MyThread t2 = new MyThread("t2", mutex);
        MyThread t3 = new MyThread("t3", mutex);
        t1.start();
        t2.start();
        t3.start();
        try {
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread exit!");
    }
}

运行结果表明,通过简单的几行代码,就实行了一个锁的所有功能。

根据JUC(Java Util Concurrency)作者的建议,AQS的使用方法要遵循上面这个模式。一是使用一个内部类Sync来继承AQS,并实现AQS的相关方法,一般是tryAcquire和tryRelease(排它锁),或者tryAcquireShared和tryReleaseShared(共享锁);二是在内部使用一个代理模式来实现锁的功能,这样做的好处是可以让暴露出的同步、互斥方法名由程序员自行决定,例如各种锁可以使用lock、unlock,Semaphore可以使用acquire和release,CountDownLatch可以使用await和countDown。

4.2 AQS基本原理

要实现一个同步器,需要三个条件:

1. 一个同步状态值,并可以原子化的操作这个状态值。显然,我们可以使用上一篇讲到的CAS方法来操作状态值;

2. 阻塞线程和解除阻塞线程的机制,可以用LockSupport来实现;

3. 维持一个阻塞线程的队列,并在队列的每个节点中存储线程的状态等信息。

AQS同时满足了这三个条件,首先,它提供了如下的状态值,以及相应的操作方法:

private volatile int state;

protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

state是一个volatile int类型,除了getter和setter方法外,还提供了一个CAS方法,提供原子化的比较更新操作。一般来说,AQS认为state为0时,同步器处于释放状态,多个线程此刻可以竞争获取同步器;state不等于0时,同步器处于已获取状态,后续线程需进入队列,等待同步器(可重入的同步器允许获取同步器的线程再次进入同步器,并使用state进行计数)。当然,很多情况下,程序员也可自己定义state的值的含义,特别是在实现读写锁时,需要将state一分为二的用。

其次,LockSupport提供了阻塞和解除阻塞的功能。因此,所有同步器的阻塞操作其实都是基于LockSupport的,也就是基于Unsafe的park和unpark方法的。

第三,AQS内部提供了一个Node类型,它是用来形成“线程等待队列”的节点类型,以及一个由Node类型组成的队列。此队列的原理留待后续章节细说。

4.3 继承AQS时需要重写的方法

从上面的例子中可以看出,通过继承时重写了两个方法tryAcquire和tryRelease,就可以实现一个具体的同步器。通过研究AQS的原理可以得知,它将一些复杂的操作封装在自己内部,留待继承者重写的方法仅有以下5个(除去构造函数):

1. tryAcquire

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

此方法是用来在多个线程竞争同步器时调用,作为AQS中acquire方法的第一个部分。若tryAcquire返回true,表明此线程获取了同步器;否则表明此线程没有获取同步器,而要进入队列等待同步器。在重写此方法时,仅需获取state的值,并通过CAS设置state的值,若成功则说明本线程竞争成功,否则说明线程竞争失败。

2. tryRelease

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

此方法是用来释放同步器时调用,作为AQS中release方法的第一个部分。若tryRelease返回true,则表明当前线程可以正常释放同步器;否则不做其他操作。在重写此方法时,需要获取state的值,进行状态判断,然后依据具体情况将其设置为另一个值(一般设置为0)即可。

3. tryAcquireShared

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

此方法与tryAcquire非常类似,不同的是它被用来实现共享锁时调用。由于共享锁允许多个线程同时获取同步器,而且在释放时也可以同时释放多个线程。因此在重写tryAcquireShared时要特别注意state的定义和读写方法。

4. tryReleaseShared

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

此方法与tryRelease类似,不同的是它也被用来实现共享锁时调用。由于共享锁的特性,重写此方法时也要注意state的定义和读写方法。

5. isHeldExclusively

protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

此方法在需要实现排它锁的Condition时重写。

4.4 排它锁的原理与流程

前面我们已经提供了一个最简单的排它锁SimpleLock,然而它的实现细节没有讲,下面我用两个图来说明,第一个图是lock的流程框架:

这里写图片描述

AQS已经把排它锁的加锁框架全部搭建好了,程序员只需要重写tryAcquire方法即可完成整个的功能。在看流程图的时候,要设想多个线程同时进入这个流程的样子。

第二个图是unlock的流程框架:

这里写图片描述

同样,AQS已经把解锁的框架全部搭建好了,程序员只需重写tryRelease方法即可实现排它锁的解锁。值得注意的是,排它锁被解锁后,仅有一个被阻塞的线程能够获得锁,其他线程依然在队列中等待。

4.5 初试共享锁

AQS不仅提供了排它锁,同时也提供了共享锁的框架。对于排它锁和共享锁,并没有明确的定义,此处根据本人自己的理解,给出一个定义。排它锁,即在一个时刻只有一个线程可以获取的锁,其他线程的加锁方法会被阻塞,直至拥有锁的线程释放该锁为止。共享锁,即在一个时刻允许多个线程同时获取的锁。根据场景的不同,可以构建多种不同类型的共享锁,有的共享锁允许定量的(n个)线程拥有锁,此外的线程在加锁时依然会被阻塞,每当一个线程释放一次锁,则后续的一个线程可以竞争获取该锁(例如Semaphore);有的共享锁会阻塞所有的线程,但一旦执行某种释放操作,则所有被阻塞的线程会被同时唤醒(例如CountDownLatch);有的共享锁需要与一个排它锁配合,实现对文件读写的各种同步(ReentrantReadWriteLock)。这些具体的同步器会在后续的章节或文章中介绍。

根据SimpleLock,我们再试着实现一个最简单的共享锁SimpleSharedLock,代码如下:

public class SimpleSharedLock {
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected int tryAcquireShared(int arg) {
            return compareAndSetState(0, 1) ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            setState(0);
            return true;
        }

        protected Sync() {
            super();
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
        sync.acquireShared(1);
    }

    public void unlock() {
        sync.releaseShared(1);
    }

    private static class MyThread extends Thread {
        private final String name;
        private final SimpleSharedLock lock;

        private MyThread(String name, SimpleSharedLock lock) {
            this.name = name;
            this.lock = lock;
        }

        @Override
        public void run() {
            try {
                lock.lock();
                System.out.println(name + " get the lock");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println(name + " release the lock");
            }
        }
    }

    public static void main(String[] args) {
        final SimpleSharedLock lock = new SimpleSharedLock();
        MyThread t1 = new MyThread("t1", lock);
        MyThread t2 = new MyThread("t2", lock);
        MyThread t3 = new MyThread("t3", lock);
        t1.start();
        t2.start();
        t3.start();
        try {
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread exit!");
    }
}

运行此段代码后发现很奇怪,虽然我们重写了tryAcquireShared和tryReleaseShared方法,但是SimpleSharedLock依然像一个排它锁,线程既不能同时拥有锁,也不能在释放时同时获取锁。这是因为我们对共享锁的流程以及AQS中state的含义依然不清楚的原因。

4.6 共享锁的原理与流程

首先明白一点:tryAcquireShared方法与tryAcquire方法不同,它返回的不是布尔值,而是整形。返回值为正表明该线程获取了锁,为负表明此线程获取锁失败,应该被阻塞,为0表明该锁尚未被获取。共享锁的加锁流程如下:

这里写图片描述

与排它锁的lock流程相比,共享锁的流程多了几步,即从setHeadAndPropagate,doReleaseShared和unparkSuccessor。这几步是在当前节点已经被唤醒后,将解锁状态向后传递的过程。本线程将唤醒下一个阻塞的共享线程(被共享锁阻塞的线程),然后此共享线程再唤醒下一个共享线程,这样直至所有共享线程都被唤醒。

共享锁的解锁流程如下:

这里写图片描述

我们会发现解锁流程并不复杂,且使用的doReleaseShared方法在加锁流程中已经使用过了。

现在我们再来回答,为什么在SimpleSharedLock中,解锁方法没有同时唤醒所有线程?原因就在于,重写的tryAcquireShared方法没有在加锁框架中发挥应有的作用,它在被唤醒后的第一次调用中(也就是在setHeadAndPropagate之前的那一次调用中)重新将state设置为1,也就是锁被获取了。因此下一个被唤醒的线程没有能够通过tryAcquireShared的检测从而重新被阻塞。若要弄清所有细节,你必须阅读AQS的源代码。

至此我们明白了,对state的查询和操作是设计一种共享锁的关键。

4.7 一次唤醒所有阻塞线程的共享锁

假设有这样一种共享锁,它天生就阻塞所有调用lock的线程,但一旦有其他线程调用unlock,则唤醒所有线程,那么应该这样设计:

public class SimpleSharedLock2 {
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected int tryAcquireShared(int arg) {
            return getState() == 0 ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for (; ; ) {
                if (compareAndSetState(1, 0)) {
                    return true;
                }
            }
        }

        protected Sync() {
            setState(1);
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
        sync.acquireShared(1);
    }

    public void unlock() {
        sync.releaseShared(1);
    }

    private static class MyThread extends Thread {
        private final String name;
        private final SimpleSharedLock2 lock;

        private MyThread(String name, SimpleSharedLock2 lock) {
            this.name = name;
            this.lock = lock;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " begin to get lock");
                lock.lock();
                System.out.println(name + " get the lock");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        final SimpleSharedLock2 lock = new SimpleSharedLock2();
        MyThread t1 = new MyThread("t1", lock);
        MyThread t2 = new MyThread("t2", lock);
        MyThread t3 = new MyThread("t3", lock);
        t1.start();
        t2.start();
        t3.start();
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println("main thread invoke unlock ");
            lock.unlock();
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread exit!");
    }
}

运行此代码,我们会发现,这种共享锁果然符合设计。但是,它是一种一次性的锁,用完之后,由于tryReleaseShared不能将state的值重新设置为其他值,因此以后该锁就失效了。这个锁其实就是简化版的CountDownLatch,我会在后续文章中详细介绍它的使用。

4.8 拥有多个许可证的共享锁

SimpleSharedLock2能够一次释放多个阻塞线程,现在我们设计一种共享锁,它允许n个线程同时获取锁,就仿佛该锁有n个许可证,每个许可证允许一个线程获取锁。释放时,每个线程会还回一个许可证让后续的线程竞争。

public class SimpleSharedLock3 {
    private static class Sync extends AbstractQueuedSynchronizer {
        final int getPermits() {
            return getState();
        }

        @Override
        protected int tryAcquireShared(int arg) {
            for (; ; ) {
                int available = getState();
                int remaining = available - arg;
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for (; ; ) {
                int available = getState();
                int remaining = available + arg;
                if (compareAndSetState(available, remaining)) {
                    return true;
                }
            }
        }

        protected Sync(int permits) {
            setState(permits);
        }
    }

    public SimpleSharedLock3(int permits) {
        sync = new Sync(permits);
    }

    private final Sync sync;

    public void lock() {
        sync.acquireShared(1);
    }

    public void unlock() {
        sync.releaseShared(1);
    }

    public int getPermits() {
        return sync.getPermits();
    }

    private static class MyThread extends Thread {
        private final String name;
        private final SimpleSharedLock3 lock;

        private MyThread(String name, SimpleSharedLock3 lock) {
            this.name = name;
            this.lock = lock;
        }

        @Override
        public void run() {
            try {
                lock.lock();
                System.out.println(name + " get the lock, permits = " + lock.getPermits());
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        //给予此锁2个许可
        final SimpleSharedLock3 lock = new SimpleSharedLock3(2);
        MyThread t1 = new MyThread("t1", lock);
        MyThread t2 = new MyThread("t2", lock);
        MyThread t3 = new MyThread("t3", lock);
        t1.start();
        t2.start();
        t3.start();
        try {
            TimeUnit.SECONDS.sleep(5);
            lock.unlock();
            System.out.println("main thread invoke unlock, permits = " + lock.getPermits());
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread exit!");
    }
}

运行结果表明,这种锁符合设计,它其实就是一个简化版的Semaphore。

5. 可重入锁ReentrantLock

了解了AQS的原理之后,再来看RentrantLock就非常简单了。从实现原理上来说,它与我们之前设计的SimpleLock很类似。但是作为Java Concurrency包中使用最频繁的锁,它还有可重入性、可中断性、公平策略和Condition等特性。RentrantLock是一个排它锁,因此当一个线程获取锁之后,其他线程只能阻塞。但是若拥有锁的线程重复调用lock方法,依然可以重复进入锁。被RentrantLock阻塞的线程可以被中断,并抛出一个InterruptedException异常。

5.1 RentrantLock

与synchronize关键字相比,ReentrantLock更加灵活,唯一的不足是使用时必须使用try-finally模式来进行加解锁,下面是使用它的一个简单代码:

public class ReentrantLockExam {
    public static void main(String[] args) {
        final ReentrantLock lock = new ReentrantLock();
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 2; i++) {
            service.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread() + " ready to get lock");
                        lock.lock();
                        System.out.println(Thread.currentThread()+" get the lock");
                        for (int j = 0; j < 5; j++) {
                            System.out.println(Thread.currentThread()+" is running, number = "+j);
                            TimeUnit.MILLISECONDS.sleep(500);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                        System.out.println(Thread.currentThread()+" unlock");
                    }
                }
            });
        }
        service.shutdown();
    }
}

代码中,两个线程竞争一把RentrantLock锁,得到锁的线程持续运行,直至解锁后,另一个线程才等得到锁并运行。若我们将lock方法连续调用两次,程序依然能够正常运行;将unlock方法也连续调用两次,线程也能够正常解锁。每lock一次,拥有该锁的线程的计数增加1,每unlock一次,计数减少1,当AQS的state为0时,所有线程可以竞争锁。

5.2 Condition

Condition是与RentrantLock配合使用,来实现synchronize中的wait、notify功能的一个接口,其具体实现类为AQS中的ConditionObject。配合使用的方法如下所示(此示例来自JDK文档,本文做了扩充):

public class ConditionExam {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final BoundedBuffer buffer = new BoundedBuffer();
        service.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 1000; i++) {
                        String str = "string" + i;
                        buffer.put(str);
                        System.out.println(Thread.currentThread() + " put " + str);
                        TimeUnit.MILLISECONDS.sleep(50);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        service.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 1000; i++) {
                        String str = (String) buffer.take();
                        System.out.println(Thread.currentThread() + " take " + str);
                        TimeUnit.MILLISECONDS.sleep(50);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        service.shutdown();
    }

    private static class BoundedBuffer {
        final Lock lock = new ReentrantLock();
        final Condition notFull = lock.newCondition();
        final Condition notEmpty = lock.newCondition();

        final Object[] items = new Object[100];
        int putptr, takeptr, count;

        public void put(Object x) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length)
                    notFull.await();
                items[putptr] = x;
                if (++putptr == items.length) putptr = 0;
                ++count;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }

        public Object take() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0)
                    notEmpty.await();
                Object x = items[takeptr];
                if (++takeptr == items.length) takeptr = 0;
                --count;
                notFull.signal();
                return x;
            } finally {
                lock.unlock();
            }
        }
    }
}

在上面的代码中,使用一个ReentrantLock以及它的两个Condition实现了对一个原生数组的安全并发访问,其中notFull条件用来控制数组不会溢出,notEmpty条件控制数组不会为空时还被取出元素。

6. 读写锁ReentrantWriteReadLock

读写锁ReentrantWriteReadLock的用法比较难以理解,因此用一个大家都熟悉的场景来说明。假设一个班级有一块黑板(共享资源),有多个老师都需要在黑板上书写。老师在写之前,先要获取写锁(writeLock),在写的时候其他的老师和学生都不能在黑板上写或者读,老师写完之后,释放写锁。学生在读黑板之前,先要获取读锁(readLock),多个学生可以同时获取读锁,因此多个学生可以同时读取黑板上的内容,在读的时候其他老师不能在黑板上写,学生读完之后,释放读锁。直至所有学生都释放完读锁,才能够由老师获取写锁。例子代码如下:

public class ReentrantReadWriteLockExam {
    public static Random rand = new Random(System.currentTimeMillis());

    public static void main(String[] args) {
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        StringBuilder blackboard = new StringBuilder(100);

        ExecutorService service= Executors.newCachedThreadPool();
        service.execute(new Teacher(lock, "Chinese Teacher", blackboard));
        service.execute(new Teacher(lock, "English Teacher", blackboard));
        service.execute(new Student(lock,"Wang",blackboard));
        service.execute(new Student(lock,"Li",blackboard));
        service.execute(new Student(lock,"Zhang",blackboard));
        service.shutdown();
    }

    private static class Teacher extends Thread {
        final private ReentrantReadWriteLock lock;
        final private String name;
        final private StringBuilder blackboard;
        private AtomicInteger id = new AtomicInteger(0);

        private Teacher(ReentrantReadWriteLock lock, String name, StringBuilder blackboard) {
            this.lock = lock;
            this.name = name;
            this.blackboard = blackboard;
        }

        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                try {
                    lock.writeLock().lock();
                    System.out.println(name + " get write lock");
                    blackboard.delete(0, 99).append(name + " has written on the blackboard, number = " + id.getAndIncrement());
                    System.out.println(name + " write content:" + blackboard);
                    TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.writeLock().unlock();
                    System.out.println(name + " release write lock");
                }
            }
        }
    }

    private static class Student extends Thread {
        final private ReentrantReadWriteLock lock;
        final private String name;
        final private StringBuilder blackboard;

        private Student(ReentrantReadWriteLock lock, String name, StringBuilder blackboard) {
            this.lock = lock;
            this.name = name;
            this.blackboard = blackboard;
        }

        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                try {
                    lock.readLock().lock();
                    System.out.println(name + " get read lock");
                    System.out.println(name + " read the blackboard:" + blackboard);
                    TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.readLock().unlock();
                    System.out.println(name+" release the read lock");
                }
            }
        }
    }
}

总结规则如下:

1.写锁是排它锁,一旦被获取,其他竞争写锁的线程被阻塞,其他竞争读锁的线程也被阻塞;

2.写锁被释放后,其他的线程可以竞争写锁或者读锁;

3.读锁是共享锁,它可以同时被多个线程获取,但是读锁和写锁是不能同时被不同的线程获取的;

4.所有的读锁被释放后,其他的线程可以竞争写锁。

7. 小结

为了提供区别于synchronize关键的阻塞机制,Java提供了锁包。锁包的核心类是AQS,它提供了排它锁和共享锁的流程框架,通过继承此类,可以构造出各种类型的同步器。可重入锁ReentrantLock是使用最广泛的排它锁,除了拥有可重入特性外,它还提供了Condition用来模拟Object的wait和notify操作。读写锁ReentrantWriteReadLock提供了对共享资源的读写保护,可以被方便的用于各类共享资源的访问控制。



版权声明:本文为LogicTeamLeader原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。