ReentrantLock(二):正确使用Condition实现等待与通知

  • Post author:
  • Post category:其他


承接上篇文章,本篇主要简单介绍Condition的用法和部分ReentrantLock的应用

关键字synchronized与wait()和notify()/notifyAll()方法相结合可以实现等待/通知模式。

类ReentrantLock同样可以实现该功能,但是要借助于Condition对象。它具有更好的灵活性,比如可以实现多路通知功能,也就是在一个Lock对象里面可以创建多个Condition(对象监视器)实例,线程对象可以注册在指定Condition中,从而有选择性的进行线程通知,在调度线程上更加灵活

使用notify和notifyAll方法进行通知时,被通知的线程是由JVM随机选择的,但是ReentrantLock结合Condition可以实现前面介绍过的“选择性通知”,这个功能是非常重要的。

synchronized相当于整个Lock对象中只有单一的Condition对象,所有线程都注册在它一个对象上,线程开始notifyAll()时,需要通知所有waiting的线程,没有选择权,会出现相当大的效率问题

Condition 接口定义的方法:

void await() throws InterruptedException;

void awaitUninterruptibly();

long awaitNanos(long nanosTimeout) throws InterruptedException;

boolean await(long time, TimeUnit unit) throws InterruptedException;

boolean awaitUntil(Date deadline) throws InterruptedException;

void signal();

void signalAll();


ReentrantLock方法摘要

newCondition

public Condition newCondition()
返回用来与此 Lock 实例一起使用的 Condition 实例。
在使用内置监视器锁时,返回的 Condition 实例支持与 Object 的监视器方法(wait、notify 和 notifyAll)相同的用法。
  ● 在调用 Condition、waiting 或 signalling 这些方法中的任意一个方法时,如果没有保持此锁,则将抛出 IllegalMonitorStateException。
  ● 在调用 waiting 条件方法时,将释放锁,并在这些方法返回之前,重新获取该锁,将锁保持计数恢复为调用方法时所持有的值。
  ● 如果线程在等待时被中断,则等待将终止,并将抛出 InterruptedException,清除线程的中断状态。
  ● 等待线程按 FIFO 顺序收到信号。
  ● 等待方法返回的线程重新获取锁的顺序与线程最初获取锁的顺序相同,在默认情况下,未指定此顺序,但对于公平 锁,它们更倾向于那些等待时间最长的线程。

例子:

public class MyService {

    private Lock lock = new ReentrantLock();
    public Condition condition = lock.newCondition();

    public void await() {
        try {
            lock.lock();
            System.out.println(" await时间为" + System.currentTimeMillis());
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void signal() {
        try {
            lock.lock();
            System.out.println("signal时间为" + System.currentTimeMillis());
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

public class ThreadA extends Thread {

    private MyService service;

    public ThreadA(MyService service) {
        super();
        this.service = service;
    }

    @Override
    public void run() {
        service.await();
    }
}

    public static void main(String[] args) throws InterruptedException {

        MyService service = new MyService();

        ThreadA a = new ThreadA(service);
        a.start();

        Thread.sleep(3000);

        service.signal();

    }

执行结果如下:

这里写图片描述

如果执行过程中出现如下异常是因为没有获得监视器对象,必须在Condition.await()方法调用之前使用lock.lock()获得同步监视器

这里写图片描述

正确用法如下:

 public void await()
    {
        try
        {
            lock.lock();
            //获取同步监视器之后
            condition.await();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }finally{
            lock.unlock();

        }
    }

还可以使用多个Condition实现通知部分线程的用法:

public class MyService {

    private Lock lock = new ReentrantLock();
    public Condition conditionA = lock.newCondition();
    public Condition conditionB = lock.newCondition();

    public void awaitA() {
        try {
            lock.lock();
            System.out.println("begin awaitA时间为" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
            conditionA.await();
            System.out.println("  end awaitA时间为" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void awaitB() {
        try {
            lock.lock();
            System.out.println("begin awaitB时间为" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
            conditionB.await();
            System.out.println("  end awaitB时间为" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void signalAll_A() {
        try {
            lock.lock();
            System.out.println("  signalAll_A时间为" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
            conditionA.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void signalAll_B() {
        try {
            lock.lock();
            System.out.println("  signalAll_B时间为" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
            conditionB.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

public class ThreadB extends Thread {

    private MyService service;

    public ThreadB(MyService service) {
        super();
        this.service = service;
    }

    @Override
    public void run() {
        service.awaitB();
    }
}

    public static void main(String[] args) throws InterruptedException {

        MyService service = new MyService();

        ThreadA a = new ThreadA(service);
        a.setName("A");
        a.start();

        ThreadB b = new ThreadB(service);
        b.setName("B");
        b.start();

        Thread.sleep(3000);

        service.signalAll_A();

    }

依照这种形式,可以在Lock对象里面可以创建多个Condition(对象监视器)实例,实现顺序通知。

这里写图片描述

lockInterruptibly

public void lockInterruptibly()
                       throws InterruptedException
如果当前线程未被中断,则获取锁。
如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
如果当前线程已经保持此锁,则将保持计数加 1,并且该方法立即返回。
如果锁被另一个线程保持,则出于线程调度目的,禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态:
  ● 锁由当前线程获得;或者
  ● 其他某个线程中断当前线程。
如果当前线程获得该锁,则将锁保持计数设置为 1。
如果当前线程:
  ● 在进入此方法时已经设置了该线程的中断状态;或者
  ● 在等待获取锁的同时被中断。
则抛出 InterruptedException,并且清除当前线程的已中断状态。
在此实现中,因为此方法是一个显式中断点,所以要优先考虑响应中断,而不是响应锁的普通获取或重入获取。

synchronized与Lock在默认情况下是不会响应中断(interrupt)操作,会继续执行完。lockInterruptibly()提供了可中断锁来解决此问题。

public static void main(String[] args) throws InterruptedException {
        final MyService service = new MyService();
        Runnable runnableRef = new Runnable() {
            @Override
            public void run() {
                service.waitMethod();
            }
        };

        Thread threadA = new Thread(runnableRef);
        threadA.setName("A");
        threadA.start();
        Thread.sleep(500);
        Thread threadB = new Thread(runnableRef);
        threadB.setName("B");
        threadB.start();
        threadB.interrupt();// 打标记
        System.out.println("main end!");
    }
public class MyService {

    public ReentrantLock lock = new ReentrantLock();
    public void waitMethod() {
        try {
            try
            {
                lock.lockInterruptibly();

            System.out
                    .println("lock begin " + Thread.currentThread().getName());
            for (int i = 0; i < Integer.MAX_VALUE / 10; i++) {
                String newString = new String();
                Math.random();
            }
            System.out
                    .println("lock   end " + Thread.currentThread().getName());
            }
            catch (InterruptedException e)
            {
                System.out
                .println("线程进入异常 " + Thread.currentThread().getName()); 
                e.printStackTrace();
            };
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

这里写图片描述

以下方法多用于辅助测试,就不一一介绍了

getQueuedThreads

protected Collection<Thread> getQueuedThreads()
返回一个 collection,它包含可能正等待获取此锁的线程。因为在构造此结果的同时实际的线程 set 可能动态地变化,所以返回的 collection 仅是尽力的估计值。所返回 collection 中的元素没有特定的顺序。此方法用于加快子类的构造速度,以提供更多的监视设施以下方法多用于辅助测试,就不一一介绍了
。
返回:
线程的 collection

hasWaiters

public boolean hasWaiters(Condition condition)
查询是否有些线程正在等待与此锁有关的给定条件。注意,因为随时可能发生超时和中断,所以返回 true 并不保证将来某个 signal 将唤醒线程。此方法主要用于监视系统状态。
参数:
condition - 条件
返回:
如果有任何等待的线程,则返回 true
抛出:
IllegalMonitorStateException - 如果没有保持此锁
IllegalArgumentException - 如果给定 condition 与此锁无关
NullPointerException - 如果 condition 为 null

getWaitQueueLength

public int getWaitQueueLength(Condition condition)
返回等待与此锁相关的给定条件的线程估计数。注意,因为随时可能发生超时和中断,所以只能将估计值作为实际等待线程数的上边界。此方法用于监视系统状态,不用于同步控制。
参数:
condition - 条件
返回:
等待线程的估计数
抛出:
IllegalMonitorStateException - 如果没有保持此锁
IllegalArgumentException - 如果给定 condition 与此锁无关
NullPointerException - 如果 condition 为 null

getWaitingThreads

protected Collection<Thread> getWaitingThreads(Condition condition)
返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。因为在构造此结果的同时实际的线程 set 可能动态地变化,所以返回 collection 的元素只是尽力的估计值。所返回 collection 中的元素没有特定的顺序。此方法用于加快子类的构造速度,提供更多的条件监视设施。
参数:
condition - 条件
返回:
线程的 collection
抛出:
IllegalMonitorStateException - 如果没有保持此锁
IllegalArgumentException - 如果给定 condition 与此锁无关
NullPointerException - 如果 condition 为 null

hasQueuedThreads

public final boolean hasQueuedThreads()
查询是否有些线程正在等待获取此锁。注意,因为随时可能发生取消,所以返回 true 并不保证有其他线程将获取此锁。此方法主要用于监视系统状态。

hasQueuedThread

public final boolean hasQueuedThread(Thread thread)
查询指定的线程是否正在等待获取此锁。注意,因为随时可能发生取消,所以返回 true 并不保证此线程将获取此锁。此方法主要用于监视系统状态。



版权声明:本文为zhang199416原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。