详解并发编程
最近学习了:
冰河《深入理解高并发编程》
;
《并发编程的艺术》
;
特此简要对学习做了部分总结,方便后续对并发编程知识的完善和巩固;
若想深入了解学习,可阅读上述参考原著;
线程与线程池
进程
进程是系统进行资源分配的基本单位。现代操作系统在运行一个程序时,会为其创建一个进程。
进程也就是应用程序在内存中分配的空间,也就是正在运行的程序,各个进程之间互不干扰
使用进程+CPU时间片轮转方式的操作系统,在宏观上同一时间段处理多个任务,换句话说,进程让操作系统的并发成为了可能。虽然并发在宏观上看,有多个任务在执行,但事实上,对单核CPU而言,任意具体时刻都只有一个任务在占用CPU资源
线程
线程是CPU调度的基本单位。是比进程更小的可单独运行的单位
在一个进程中,可以创建多个线程,这些线程可以拥有各自私有的计数器、栈内存和局部变量,并且能够访问共享的内存变量。
多线程
在同一程序中,可同时运行多个线程来执行不同的任务;这些线程可同时运用CPU多个核心来运行
为什么使用多线程?最主要的是:多线程编程可最大限度的利用多核CPU资源。
上下文切换
上下文切换是指CPU从一个进程(或线程)切换到另一个进程(或线程)。
上下文是指某一时刻CPU寄存器和程序计数器的内容
上下文切换通常是计算密集型的,意味着此操作会消耗大量的CPU时间,故
线程也不是越多越好
。如何减少系统中上下文切换次数,是提升多线程性能的一个重要课题
线程的实现方式
-
继承Thread类
public class ThreadDemo { public static class TestThread extends Thread { @Override public void run() { System.out.println("extends Thread"); } } public static void main(String[] args) { TestThread testThread = new TestThread(); testThread.start(); } //console:extends Thread }
-
实现Runnable接口
public class ThreadDemo { public static class TestThreadRun implements Runnable { @Override public void run() { System.out.println("extends Thread2"); } } public static void main(String[] args) { Thread thread = new Thread(new TestThreadRun()); thread.start(); } //console:extends Thread2 }
-
实现Callable接口
public class ThreadDemo { public static class TestThreadCall implements Callable<String> { @Override public String call() { System.out.println("extends Thread2"); return "result:do success"; } } public static void main(String[] args) { TestThreadCall call = new TestThreadCall(); String result = call.call(); System.out.println(result); } //console: // extends Thread2 //result:do success }
线程的优先级
在Java中,通过一个整型的成员变量priority来控制线程优先级,范围从1~10,值越大优先级越高,默认为5。
优先级高的线程在分配CPU时间片时,分配概率高于低优先级的线程。
注意:
线程优先级不能作为程序正确性的依赖
(高优先级的不保证一定早于低优先级的执行)
线程的执行顺序
在同一个方法中,连续创建多个线程后,调用线程的start()方法的顺序,并不能决定线程的执行顺序
如何确保线程的执行顺序?
-
可以使用Thread类中的join()方法来确保线程的执行顺序。
join()实际上让主线程等待当前子线程执行完成
。 - join()方法内部会调用本地wait()方法,调用线程wait()方法时,会使主线程处于等待状态,等待子线程执行完毕后再执行
线程的生命周期
线程生命周期中几个重要状态:
-
NEW:线程被创建,但还没有调用start()方法;
-
RUNNABLE:可运行状态,包含就绪状态、运行中状态;
-
BLOCKED:阻塞状态,这状态的线程,须要等待其他线程释放锁或者等待进入synchronized;
-
WAITTING:等待状态,此状态的线程,需要等待其他线程唤醒或中断操作,方可进入下一状态;
调用如下三个方法,会让线程进入等待状态:
- Object.wait():使当前线程处于等待状态,直到另一个线程唤醒它
- Thread.join():等待线程执行完毕,底层调用的是Object的wait()方法
- LockSupport.park():除非获得调用许可,否则禁用当前线程进行线程调度
-
TIME_WAITTING:超时等待状态,可在规定时间后,自行返回;
-
TERMINATED:终止状态,表示线程执行完毕;
线程生命周期流程图(简略):
注意:
使用jps结合jstack命令可线程分析生产环境Java线程异常信息
深入理解Thread类
Thread类定义
public class Thread implements Runnable {...}
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Thread类实现了Runnable接口,而Rnnnable接口仅有一个run()方法,并被@FunctionalInterface注解修饰
加载本地资源
/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
registerNatives();
}
此方法主要用来加载一些本地资源,并在静态代码块中调用此本地方法
Thread成员变量
//线程名称
private volatile String name;
//优先级
private int priority;
private Thread threadQ;
private long eetop;
/* Whether or not to single_step this thread. */
//是否单步线程
private boolean single_step;
/* Whether or not the thread is a daemon thread. */
//是否守护线程
private boolean daemon = false;
/* JVM state */
private boolean stillborn = false;
/* What will be run. */
//实际执行体
private Runnable target;
/* The group of this thread */
private ThreadGroup group;
/* The context ClassLoader for this thread */
private ClassLoader contextClassLoader;
/* The inherited AccessControlContext of this thread */
//访问控制上下文
private AccessControlContext inheritedAccessControlContext;
/* For autonumbering anonymous threads. */
//为匿名线程生成名称的编号
private static int threadInitNumber;
private static synchronized int nextThreadNum() {
return threadInitNumber++;
}
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
//与此线程相关的ThreadLocal
ThreadLocal.ThreadLocalMap threadLocals = null;
/*
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//当前线程请求的堆栈大小
private long stackSize;
//线程终止后存在的JVM私有状态
private long nativeParkEventPointer;
//线程ID
private long tid;
/* For generating thread ID */
//用于生成线程ID
private static long threadSeqNumber;
/* Java thread status for tools,
* initialized to indicate thread 'not yet started'
*/
//线程状态,初始为0,表示未启动
private volatile int threadStatus = 0;
/**
* The argument supplied to the current call to
* java.util.concurrent.locks.LockSupport.park.
* Set by (private) java.util.concurrent.locks.LockSupport.setBlocker
* Accessed using java.util.concurrent.locks.LockSupport.getBlocker
*/
volatile Object parkBlocker;
/* The object in which this thread is blocked in an interruptible I/O
* operation, if any. The blocker's interrupt method should be invoked
* after setting this thread's interrupt status.
*/
//Interruptible中定义了中断方法,用来中断特定线程
private volatile Interruptible blocker;
//当前线程的内部锁
private final Object blockerLock = new Object();
//线程最小优先级
public final static int MIN_PRIORITY = 1;
//默认优先级
public final static int NORM_PRIORITY = 5;
//最大优先级
public final static int MAX_PRIORITY = 10;
从成员变量可以看出,Thread类不是一个任务,而是一个实实在在的线程对象,其内部Runnable类型的target变量,才是实际执行的任务
线程的状态定义
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
//新建状态;线程被创建,但是还没有调用start()方法
NEW,
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
//可运行状态;包括运行中状态和就绪状态
RUNNABLE,
//阻塞状态;此状态的线程需要等待其他线程释放锁,或者等待进入synchronized
BLOCKED,
//等待状态;此状态的线程需要其他线程对其进行唤醒或者中断状态,进而进入下一状态
WAITING,
//超时等待状态;可以在一定的时间自行返回
TIMED_WAITING,
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
//终止状态;当前线程执行完毕
TERMINATED;
}
Thread类的构造方法
public Thread() {
init(null, null, "Thread-" + nextThreadNum(), 0);
}
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}
public Thread(ThreadGroup group, Runnable target) {
init(group, target, "Thread-" + nextThreadNum(), 0);
}
public Thread(String name) {
init(null, null, name, 0);
}
public Thread(ThreadGroup group, String name) {
init(group, null, name, 0);
}
public Thread(ThreadGroup group, Runnable target, String name) {
init(group, target, name, 0);
}
通过几个常用的Thread类的构造方法,发现Thread类的初始化,主要是通过init()方法来实现
init()方法
/**
* Initializes a Thread.
*
* @param g the Thread group
* @param target the object whose run() method gets called
* @param name the name of the new Thread
* @param stackSize the desired stack size for the new thread, or
* zero to indicate that this parameter is to be ignored.
* @param acc the AccessControlContext to inherit, or
* AccessController.getContext() if null
* @param inheritThreadLocals if {@code true}, inherit initial values for
* inheritable thread-locals from the constructing thread
*/
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
//名称为空
throw new NullPointerException("name cannot be null");
}
this.name = name;
Thread parent = currentThread();
//安全管理器
SecurityManager security = System.getSecurityManager();
if (g == null) {
/* Determine if it's an applet or not */
/* If there is a security manager, ask the security manager
what to do. */
if (security != null) {
//获取线程组
g = security.getThreadGroup();
}
/* If the security doesn't have a strong opinion of the matter
use the parent thread group. */
if (g == null) {
//线程组为空,从父类获取
g = parent.getThreadGroup();
}
}
/* checkAccess regardless of whether or not threadgroup is
explicitly passed in. */
g.checkAccess();
/*
* Do we have the required permissions?
*/
if (security != null) {
if (isCCLOverridden(getClass())) {
security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
}
}
g.addUnstarted();
//当前线程继承父线程相关属性
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;
/* Set thread ID */
tid = nextThreadID();
}
Thread类的构造方法,是被创建Thread线程的主线程调用的,此时调用构造方法的主线程就是Thread的父线程;在init()方法中,新创建的Thread线程,会继承父线程的部分属性
run()方法
@Override
public void run() {
if (target != null) {
target.run();
}
}
可以看到,Thread方法的run()方法实现比较简单,实际是由Runnable类型的target来运行run()方法实现的;
须注意的是:直接调用Runnable接口的run方法,不会创建新线程来执行任务;如果需要创建新线程执行任务,则需要调用Thread类的start()方法;
start()方法
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
//标记线程是否启动
boolean started = false;
try {
//调用本地方法启动
start0();
//变更线程启动标识
started = true;
} finally {
try {
if (!started) {
//未启动,则线程组中标记为启动失败
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
从源码可看出,start()方法被synchronized的修饰,说明此方法是同步的,它会在线程实际启动前,检查线程的状态,如果不是0(NEW状态),则直接返回异常;
所以一个线程只能启动一次,多次启动是会报异常的
;
调用start()方法后,新创建的线程就会处于就绪状态(如果没有被CPU调度),当CPU有空闲时,会被CPU调度执行,此时线程为运行状态,JVM会调用线程的run()方法来执行任务
sleep()方法
public static native void sleep(long millis) throws InterruptedException;
public static void sleep(long millis, int nanos) throws InterruptedException {
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (nanos < 0 || nanos > 999999) {
throw new IllegalArgumentException(
"nanosecond timeout value out of range");
}
if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
millis++;
}
sleep(millis);
}
sleep()方法会让线程休眠一段时间,需注意的是,调用sleep()方法使线程休眠后,不会释放锁
join()方法
join()方法的使用场景,通常是启动线程执行任务的线程,调用执行线程的join()方法,等待执行线程执行任务,直到超时或者执行线程终止
interrupt()方法
线程中断:
在某些情况下,我们启动线程之后,发现并不需要运行它,需要中断此线程。目前在JAVA中还没有安全直接的方法来停止线程,但是JAVA引入了线程中断机制来处理需要中断线程的情况。
线程中断机制是一种协作机制。需要注意通过中断操作,并不能直接终止线程运行,而是通知被中断的线程自行处理。
Thread类中提供了几个方法,用来处理线程中断:
- Thread.interrupt():中断线程。这里的中断线程不会立即终止线程运行,而是将线程中断标识设置为true.
- Thread.currentThread.isInterrupt():测试当前线程是否被中断。线程的中断状态受这个方法的影响,意思是调用一次使线程的中断状态设置为true,连续调用两次会使该线程中断状态重新设置为false.
- Thread.isInterrupt():测试当前线程是否被中断。与上面方法不同的是,此方法不会改变线程中断状态。
此方法用来中断当前线程执行,它通过设置线程的中断标志位来中断当前线程。此时,如果为线程设置了中断标志位,
可能会抛出InterruptException异常,同时会清除当前线程的中断状态。这种中断线程的方式比较安全,它能使正在执行的任务可以继续执行完成,而不像stop那样强制线程关闭。
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
过期的suspend()/resume()/stop()
suspend()/resume()/stop()三个方法,可简单理解为对线程的暂停、恢复和终止操作,由于方法已过时,不推荐使用。
Callable和Futrue
Callable接口
Callable接口可以获取线程执行后的返回结果;而继承Thread和实现Runnable接口,无法获取执行结果
@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
可以看出,Callable接口和Runnable接口类似,同样是只有一个方法的函数式接口;不同的是Callable提供的方法有返回值,而且支持泛型。
Callable一般是配合线程次工具ExecutorService类使用,我们会在后续章节深入线程池的使用。
这里只介绍ExecutorService可以调用submit方法来执行一个Callable,并返回一个Future,后续程序可通过Future的get方法获取执行的结果。
public class TestTask implements Callable<String> {
@Override
public String call() throws Exception {
//模拟程序执行需要一秒
Thread.sleep(1000);
return "do success!";
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
TestTask testTask = new TestTask();
Future<String> submitRes = executorService.submit(testTask);
//注意调用get方法会阻塞当前线程,知道得到结果
//实际编码中建议使用设有超时时间的重载get方法
String reslut = submitRes.get();
System.out.println(reslut);
}
//console:
// do success!
}
异步模型
-
无返回结果的异步模型
无返回结果的异步任务,可直接丢进线程或线程池中运行,此时无法直接获取任务执行结果;一种方式是可以通过回调方法来获取运行结果;实现方式类似观察者模式
-
有返回结果的异步模型
在JDK中提供可直接获取返回异步结果的方案:
-
使用Future获取结果
使用Futrue接口往往配合线程池来获取异步结果
-
使用FutrueTask获取结果
FutureTask类即可结合Thread类使用、也可配合线程池使用
-
Future接口
-
Future接口
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeotException;
-
boolean cancel(boolean)
- 取消任务的执行,接收一个boolean类型参数,成功取消返回true,否者返回false.
- 任务已完成、已结束或不可取消,返回false,表示取消失败
- 若任务未启动,调用此方法返回true,表示取消成功
- 若任务已启动,会根据输入boolean参数决定,是否通过中断线程来取消任务运行
-
boolean isCancelled()
- 判断任务在完成之前取消
- 若任务完成前取消,返回true,否则返回false
- 注意:只有在任务未启动、完成前取消,才会返回true,否则都会返回false
-
boolean isDone()
- 判断任务是否已经完成
- 若任务正常结束、抛出异常退出、被取消,都会返回true,表示任务完成
-
V get()
- 任务完成时,直接返回任务的结果数据
- 未完成时,等待任务完成并返回结果
-
V get(long,TimeUnit)
- 任务完成时,直接返回任务完成结果
- 未完成时,会在超时时间内等过返回结果,超过时间,抛出TimeOutException异常
-
boolean cancel(boolean)
-
RunnableFuture接口
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
RunnabeleFuture接口不但继承了Future接口,同时继承了Runnable接口,因此同时兼具两者的抽象接口
-
FutureTask类
public class FutureTask<V> implements RunnableFuture<V> { //详细源码,后续分析 }
FutureTask类是RunnableFuture接口非常重要的实现类,他实现了RunnableFuture接口、Future接口和Runnnable接口的所有抽象方法
-
FutureTask类中的变量与常量
首先定义了一个volatile修饰的变量state,volatile变量通过内存屏障和禁止重排序来实现线程安全;
然后定义几个任务运行时的状态常量;
(代码注释中,给出几种可能的状态变更情况)
/* Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
接下来,又定义了几个成员变量;
/** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
- callable: 通过调用run()方法来执行具体任务;
- outcaom:通过get()方法获取到的返回结果或异常信息
- runner:用来运行callable接口的线程,并通过CAS来保证线程安全
- waiters:等待线程的堆栈,派生类中,会通过CAS和此堆栈来实现运行状态的切换
-
构造方法
两种不同传参的构造方法;
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. If * you don't need a particular result, consider using * constructions of the form: * {@code Future<?> f = new FutureTask<Void>(runnable, null)} * @throws NullPointerException if the runnable is null */ public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
-
isCancelled()和isDone()
public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; }
这两个方法中,都是通过判断状态值的大小来判断是否已取消或者完成的;
这里可以学习的是,今后定义状态值时,尽量遵循一定的变化规律来定义,这样对于状态频繁变更的场景,规律的状态值,进行业务逻辑时可能起到事半功倍的效果
-
cancel(boolean)方法
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
首先会根据状态判断或者CAS操作结果,来快速判断是否可取消;若状态不为NEW或CAS返回false,则直接返回取消失败;
然后在try代码块中,先判断是否可中断来取消;若可以,则定义一个引用指向运行的任务,并判断任务是否为空,不为空则调用中断方法,然后修改运行状态为取消;
最后调用finally代码块中的finishCompletion()方法来结束任务运行;
/** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
在finishCompletion()方法中,先定义了一个for循环,循环waiters(线程等待堆栈),循环终止条件为waiters为空;具体循环内部,先判断CAS操作是否成功,若成功,则定义新一自旋循环;在自旋循环中,会唤醒堆栈中的线程,使其运行完成,完成之后break跳出循环,最后调用done()方法并置空callable;
-
get()方法
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
无参的get()方法,在任务未运行完成时,会阻塞等待任务完成;有参的get()方法,会阻塞等待完成,但超过给定时长后,会抛出TimeoutException异常;
/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
awaitDone()方法主要就是等待执行的完成或中断;
其中最重要的就是for自旋循环,循环中会先判断是否中断,若中断,则调用removeWaiter()移除等待堆栈,抛出中断异常;若未中断,则往下执行判断是否完成逻辑;
-
set()和setException()
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); finishCompletion(); } }
两个方法逻辑几乎相同,只是一个在设定任务状态时设置为NORMAL,一个设置为EXCEPTIONAL;
-
run()和runAndReset()
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
可以说使用使用了Future或FutureTask(),就必然会调用run()方法来运行任务;
run()方法中,会先判断是否为NEW状态或者CAS操作返回false,将直接返回,不再继续执行;
接下来try代码块中,则是执行callable的call()方法来运行,并接收结果;
-
removeWaiter()方法
private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s){ s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
此方法主要是通过自旋循环的方式,移除WaitNode(等待堆栈)中的线程;
-
深度解析ThreadPoolExecutor
Java的线程次技术是Java最核心的技术之一,在Java高并发领域中,是一个永远绕不开的话题
Thread直接创建线程的不足
- 每次new Thread()都新建线程,无复用,性能差;
- 线程缺乏统一管理,可能无限制的新建线程,可能占用大量资源导致OOM或者死机;
- 缺少更多的管控操作,如更多执行、定期执行、中断等;
使用线程池的好处
- 可复用已有的线程,减少新建线程带来的开销,提升性能;
- 可有效控制最大并发数,提升资源利用率,同时减少过多线程导致的资源竞争;
- 提供定时执行、定期执行、单线程、并发数控制等功能;
- 提供支持线程池监控的方法,可实时监控线程池运行情况;
线程池
-
Executors
- newCachedThreadPool:创建一个可缓冲的线程池,若线程池的大小超出了处理需要,可以灵活回收空闲线程,若无线程可回收,则新建线程;
- newFixedThreadPool:创建一个定长的线程池,可以控制最大线程并发数,超过定长的线程将在队列中等待;
- newScheduledThreadPool:创建一个定长的线程池,可以定时、定期性的执行;
- newSingleThreadPool:创建一个单线程化的线程池,使用唯一的线程执行线程任务,可保证所有任务按照指定顺序执行;
- newSingleScheduleThreadPool:创建一个单线程化的线程池,可定时、定期性的执行;
- newWorkStealingThreadPool:创建一个具有并行级别的work-stealing线程池;
线程池实例的几种状态
-
Running
运行状态,能接受新提交的任务,也能处理阻塞队列中的任务
-
Shutdown
关闭状态,不在接收新提交任务,但可以处理阻塞队列中已保存的任务;
Running –> shutdown() –> Shutdown
-
Stop
停止状态,不能接收新任务,也不能处理阻塞队列中的任务,会中断正在处理中的任务;
Running/Shutdown –> shutdownNow() –> Stop
-
Tidying
清空状态,所有的任务都已经终止,有效线程数为0(阻塞队列中的线程数为0,线程池中执行的线程数也为0)
-
Terminated
结束状态,Tidying –> terminate() –> Terminated
注意:无须对线程池状态做特殊处理,线程池状态是线程池内部根据方法自行定义和处理的
合理配置线程数的建议
- CPU密集型任务,需要压榨cpu,线程数可设置为ncpu+1(cpu数量+1)
- IO密集型任务,数量可设置为ncpu*2(2倍的cpu数量)
线程池核心类之ThreadPoolExecutor
-
构造方法
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default thread factory and rejected execution handler. * It may be more convenient to use one of the {@link Executors} factory * methods instead of this general purpose constructor. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
此为参数最多的构造方法,其他构造都以此重载
- corePoolSize:核心线程数量
- maximumPoolSize:最大线程数量
- workQueue:阻塞队列,存储等待执行的任务
上述三个参数关系如下:
- 若运行的线程数小于corePoolSize,直接新建线程运行;即使线程池中的其他线程是空闲的
- 若运行的线程数大于等于corePoolSize,且小于maximumPoolSize,此时多出的数量,会进入workQueue等待,只有workQueue满时,才会新建线程
- 若corePoolSize等于maximumPoolSize,那此时线程池大小固定,若有新任务提交,若workQueue未满,则进入workQueue,等待有空闲线程时,从workQueue取出执行
- 若运行的线程数超过了maximumPoolSize,且workQueue已满,则会通过拒绝策略rejectHandler来处理策略
基于上述参数,线程次会对任务进行如下处理:
当提交一个新任务到线程池,线程池会根据当前运行的线程数,来执行不同的处理方式;主要有三种处理方式:直接切换、使用无界队列、使用有界队列
- 直接切换常使用的队列就是SynchronousQueue
- 使用无限队列,就是使用基于链表的队列。例如,LinkedBlockingQueue,使用此队列时,线程池中创建的最大线程数就是corePoolSize,maximumPoolSize将不会起作用
- 使用有界队列,就是使用基于数组的队列。例如,ArrayBlockingQueue,使用这种方式,可将线程池最大线程数限制为maximumPoolSize,可降低资源的消耗;但这种方式使得线程次对线程的调度更困难了,因为线程池数和队列数都是固定的了
根据上述参数,可以简单得出降低资源消耗的一些措施:
- 若想降低系统资源消耗、上下文切换开销等,可以设置一个较大的队列容量和较小的线程池容量,这样会降低线程处理任务的吞吐量
- 若提交的任务经常阻塞,可以重新设置较大的线程次最大线程数
-
keepAliveTime:线程没有执行任务时,最多等待多久时间终止
当线程池中的线程数,超过corePoolSize时,若没有新任务提交,超出核心线程数的线程,不会立即销毁,会在等待keepAliveTime时间后销毁
-
unit:keepAliveTime的时间单位
-
threadFactory:线程工厂,用来创建线程
缺省时会默认创建一个线程工厂,默认的工厂新建的线程具有相同的优先级,且是非守护的,同时也设置了线程名称
-
rejectHandler:拒绝策略
若workQueue满了,且没有空闲线程,则会执行拒绝策略
线程池共提供了四种拒绝策略:
- 直接抛出异常,这也是默认的策略。实现类为AbortPolicy
- 用调用者所在线程来执行,实现类为CallerRunsPolicy
- 丢弃队列中最靠前的任务,并执行当前任务,实现类为DiscardOldestPolicy
- 直接丢弃当前任务,实现类为DiscardPolicy
-
启动和停止的方法
- execute():提交任务,交给线程池执行
- submit():提交任务,可以返回结果,相当于execute+Future
- shutDown():关闭线程池,等待线程执行完毕
- shutDownNow():立即关闭线程池,不等待线程执行完毕
-
适用于监控的方法
- getTaskCount():获取线程已执行和未执行的总数
- getCompletedTaskCount():获取已执行完成的线程数
- getCorePoolSize():获取核心线程数
- getActiveCount():或取运行中线程数
深度解析线程池中顶层接口和抽象类
接口和抽象类总览
- Executor接口:线程池的最顶层接口,提供了一个无返回值的提交任务方法
- ExecutorService:继承自Executor,扩展了很多功能,例如关闭线程池、提交任务并返回结果、唤醒线程池中的任务等
- AbstractExecutotService:继承自ExecutorService,实现了一些非常实用的方法,供子类调用
- ScheduledExecutorService:继承自ExecutorService,扩展了定时任务相关的方法
Executor接口
public interface Executor {
void execute(Runnable command);
}
Executor接口比较简单,提供了一个执行提交任务的方法execute(Runnable command)
ExecutorService接口
ExecutorService接口是非定时任务类线程池的核心接口,通过此接口执行向线程池提交任务(支支持有返回和无返回两种方式)、关闭线程池、唤醒线程任务等。
public interface ExecutorService extends Executor {
//关闭线程池,不再接受新提交的任务,但之前提交的任务继续执行,直到完成
void shutdown();
//立即关闭线程池,不再接受新提交任务,会尝试停止线程池中正在执行的任务
List<Runnable> shutdownNow();
//判断线程池是否已经关闭
boolean isShutdown();
//判断线程中所有任务是否已经结束,只有调用shutdown()或shutdownNow()之后,调用此方法才返回true
boolean isTerminated();
//等待线程中所有任务执行结束,并设置超时时间
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
//提交一个Callable类型的任务,并返回一个Future类型的结果
<T> Future<T> submit(Callable<T> task);
//提交一个Runnable类型任务,并设置泛型接收结果数据,返回一个Futrue类型结果
<T> Future<T> submit(Runnable task, T result);
//提交一个Runnable类型任务,并返回一个Future类型结果
Future<?> submit(Runnable task);
//批量提交Callable类型任务,并返回他们的执行结果,Task列表和Future列表一一对应
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
//批量提交Callable类型任务,并获取返回结果,并限定处理所有任务的时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
//批量提交任务,并获得一个已经成功执行任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
//批量提交任务,并获得一个已经完成任务的结果,并限定处理任务时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorEservice
此类是一个抽象类,继承自ExecutorEservice,并在此基础上,实现了一些实用方法,供子类调用。
1. newTaskFor()
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
FutureTask用来获取运行结果,实际应用中,我们经常使用的是它的子类FutureTask;
newTaskFor方法的作用,就是将任务封装成FutureTask对象,后续将FutureTask对象提交到线程池
2. doInvokeAny()
此方法是批量执行线程池任务,最终返回一个结果数据的核心方法;此方法只要获取到其中一个线程的结果,就会取消线程池中其他运行的线程;
3. invokeAny()
此方法内部仍是调用doInvokeAny()方法,提交批量线程,其中有一个完成返回结果,其余的线程都取消运行;
4. invokeAll()
invokeAll()方法实现了有超时设置和无超时设置的逻辑;
无超时设置的方法逻辑是:将提交的批量任务,都封装成RunnableFuture对象,然后调用execute()方法执行任务,并将结果Future添加到Future集合中,之后会对Future集合进行遍历,判断任务是执行完成;若未完成,则会调用get方法阻塞,直到获取结果,此时会忽略异常;最后在finally中,判断所有任务的完成标识,若未完成,则取消执行;
5. submit()
此方法较简单,就是将任务封装成RunnableFuture对象,调用execute()执行完成后,返回Future结果
ScheduleExecutorService
此接口继承自ExecutorService,除了继承父类的方法外,还提供了定时处理任务的功能
源码角度分析创建线程池方式
Executors.newWorkStealingPool:此方法是Java8中新增的创建线程池的方法,它能够为线程设置并行级别,并拥有更高的并发度和性能;除此之外,其他的创建线程池的方法,都是调用的ThreadPoolExecutor的构造方法;
ThreadPoolExecutor创建线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
通过查看ThreadPoolExecutor类的源码,发现最终都通过调用异常构造方法来构造线程次,其他初始化参数前面已经介绍过;
ForkJoinPool类创建线程池
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
从上述源码可以看出,Executors.newWorkStealingPool()方法,构造线程池时,实际是调用ForkJoinPool来构造的线程池;
/**
* Creates a {@code ForkJoinPool} with the given parameters, without
* any security checks or parameter validation. Invoked directly by
* makeCommonPool.
*/
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
查看源码得知,ForkJoinPool各种构造方法,最终调用的是上述的私有构造方法;
其中各初始化参数如下:
- parallelism:并发级别
- factory:创建线程的工厂
- handler:当线程次中的线程抛出未捕获的异常时,通过此UncaughtExceptionHandler进行处理
- mode:取值表示FIFO_QUEUE或LIFO_QUEUE
- workerNamePrefix:执行任务的线程名称前缀
ScheduledThreadPoolExecutor创建线程池
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,查看其源码可知,其构造方法本质还是调用ThreadPoolExecutor的构造方法,只不过队列传递的为DelayedWorkQueue。
源码解析ThreadPoolExecutor如何正确运行
ThreadPoolExecutor中的重要属性
-
ctl相关的属性
AomaticInteger类型的常量ctl是贯穿线程池整个生命周期
//主要用来保存线程状态和线程数量,前3位保存线程状态,低29位报存线程数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //线程池中线程数量(32-3) private static final int COUNT_BITS = Integer.SIZE - 3; //线程池中的最大线程数量 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits //线程池的运行状态 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl //获取线程状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //获取线程数量 private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } /* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */ private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; } /** * Attempts to CAS-increment the workerCount field of ctl. */ private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } /** * Attempts to CAS-decrement the workerCount field of ctl. */ private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } /** * Decrements the workerCount field of ctl. This is called only on * abrupt termination of a thread (see processWorkerExit). Other * decrements are performed within getTask. */ private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); }
-
其他重要属性
//用于存放任务的阻塞队列 private final BlockingQueue<Runnable> workQueue; //可重入锁 private final ReentrantLock mainLock = new ReentrantLock(); /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ //存放线程池中线程的集合,访问这个集合时,必须先获得mainLock锁 private final HashSet<Worker> workers = new HashSet<Worker>(); /** * Wait condition to support awaitTermination */ //在锁内部阻塞等待条件完成 private final Condition termination = mainLock.newCondition(); //线程工厂,以此来创建新线程 private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. */ //拒绝策略 private volatile RejectedExecutionHandler handler; /** * The default rejected execution handler */ //默认的拒绝策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
ThreadPoolExecutor中的重要内部类
-
Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
Work类实现了Runnable接口,需要重写run()方法,而Worker的run方法本质是调用ThreadPoolExecutor的runWorker()方法
-
拒绝策略
在线程池中,如果WorkQueue队列满了,且没有空闲线程,当再提交新任务时,将会执行拒绝策略;
而线程池总共提供了四种拒绝策略:
- 直接抛出异常,也是默认的策略;实现类为AbortPolicy
- 用调用者线程来执行任务;实现类为CallerRunsPolicy
- 丢弃队列中最靠前的任务,并执行当前任务;DiscardOldestPolicy
- 直接丢弃当前任务;实现类为DiscardPolicy
在ThreadPoolExecutor中,提供了四个内部类来实现对应的策略;
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } /** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */ public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } /** * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } /** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
我们也可以通过实现RejectedExecutionHandler接口,并重写rejectedExecution()方法来自定义拒绝策略;
在创建线程次时,通过ThreadPoolExecutor的构造方法,传入我们自定义的拒绝策略;
源码解析ThreadPoolExecutor核心流程
ThreadPoolExecutor中存在一个workers工作线程集合,用户可以向线程池中添加任务,workers集合中的线程可以直接执行任务,或者从任务队列中获取任务后执行;
ThreadPoolExecutor提供了线程池从创建、执行任务、到消亡的整个流程;
ThreadPoolExecutor中,线程池的逻辑主要体现在execute(Runnable command),addWorker(Runnable firstTask, boolean core),addWorkerFailed(Worker w)等方法和拒绝策略上;接下来深入分析这个几个核心方法
execute(Runnable command)
此方法的作用是提交Runnable类型的任务到线程池中;
public void execute(Runnable command) {
if (command == null)
//若提交的任务为空,则提交空指针异常
throw new NullPointerException();
//获取线程池的状态,和线程池中线程数量
int c = ctl.get();
//若线程池中线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//重新开启线程执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//若线程池处于RUNNING状态,则将任务添加到阻塞队列中
//只有线程池处于RUNNING状态时,才能添加到队列
if (isRunning(c) && workQueue.offer(command)) {
//再次获取线程次状态和线程池数量,用于二次检查
//向队列中添加线程成功,但由于其他线程可能会修改线程池状态,所以这里需要进行二次检查
int recheck = ctl.get();
//如果线程池没有再处于RUNNING状态,则从队列中删除任务
if (! isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
//若线程池为空,则新建一个线程加入
addWorker(null, false);
}
else if (!addWorker(command, false))
//任务队列已满,则新建一个Worker线程,若新增失败,则执行拒绝策略
reject(command);
}
addWorker(Runnable firstTask, boolean core)
此方法总体上可分为三部分,第一部分主要是CAS安全向线程池中添加工作线程;第二部分是添加新的工作线程;第三部分则是通过安全的并发方式,将线程添加到workers中,并启动线程执行任务
private boolean addWorker(Runnable firstTask, boolean core) {
//循环标签,重试的标识
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//检查队列是否在某些特定条件下为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//此循环中主要是通过CAS的方式增加线程个数
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS的方式增加线程数量
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//跳出最外层循环,说明已通过CAS增加线程成功
//此时创建新的线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//将新建线程封装成Woker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//独占锁,保证操作workers的同步
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将Worker加入队列
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//释放独占锁
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorkerFailed(Worker w)
在addWorker(Runnable firstTask, boolean core)方法中,若添加工作线程失败,或工作线程启动失败,则调用此addWorkerFailed(Worker w)方法;此方法业务较简答,获取独占锁,从workers中移除任务,并通过CAS将任务数量减一,最后释放锁;
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
-
拒绝策略
/** * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. */ final void reject(Runnable command) { handler.rejectedExecution(command, this); }
RejectExecutionHandler的四种实现类,正是线程次提供的四种拒绝策略的实现类;
此方法具体哪种策略,是根据创建线程池时,传入的参数来决定的;缺省的话则使用默认的拒绝策略;
源码解析线程池中Woker的执行流程
Woker类分析
Woker从类结构上看,继承了AQS(AbstractQueueSynchronizer)类,并实现了Runnable接口;本质上Woker类即是一个同步组件,也是一个执行任务的线程;
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
在Worker类的构造方法中可以看出,首先将同步状态state设置为-1,这是为了防止runWorker方法运行之前被中断;
这是因为如果有其他线程调用线程池中的shutdownNow()方法时,若Worker类中state > 0,则会中断线程,state为-1则不会中断线程;
Worker类实现了Runnable接口,需要重写run方法,而run方法内部实际是调用ThreadPoolExecutor的runWorker()方法;
runWorker(Worker w)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//释放锁,将state设置为0,允许中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//若任务不为空,或队列中获取的任务不为空,则进入循环
while (task != null || (task = getTask()) != null) {
//任务不为空,则先获取woker线程的独占锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//若线程次已经停止,线程中断时未中断成功
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//执行中断操作
wt.interrupt();
try {
//任务执行前置逻辑
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行后置逻辑
afterExecute(task, thrown);
}
} finally {
//任务执行完成后,将其置空
task = null;
//已完成任务数加一
w.completedTasks++;
//释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//执行Worker完成退出逻辑
processWorkerExit(w, completedAbruptly);
}
}
从以上源码分析可知,当Woker从线程中获取任务为空时,会调用getTask()方法从队列中获取任务;
getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//自旋
for (;;) {
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//检测队列在线程池关闭或停止时,是否为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//减少Worker线程数量
decrementWorkerCount();
return null;
}
//获取线程池中线程数量
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//从任务队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
//任务不为空,直接返回
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
-
beforeExecute(Thread t, Runnable r)
protected void beforeExecute(Thread t, Runnable r) { }
此方法体为空,说明可以创建ThreadPoolExecutor的子类,来重写该方法,这样可在线程池实际执行任务前,执行我们自定义的前置逻辑;
-
afterExecute(Runnable r, Throwable t)
protected void afterExecute(Runnable r, Throwable t) { }
同上,我们可以在子类中重写此方法,这样可在线程池执行任务之后,执行我们自定义的后置处理逻辑
-
processWorkerExit(Worker w, boolean completedAbruptly)
此方法的主要逻辑是执行退出Worker线程逻辑,并执行一些清理工作;
-
tryTerminate()
final void tryTerminate() { //自旋 for (;;) { //获取ctl int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate //若当前线程池中线程数量不为0,则中断线程 interruptIdleWorkers(ONLY_ONE); return; } //获取线程池的全局锁 final ReentrantLock mainLock = this.mainLock; //加锁 mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { //将线程状态设置为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //唤醒所有因调用awaitTermination()而阻塞的线程 termination.signalAll(); } return; } } finally { //释放锁 mainLock.unlock(); } // else retry on failed CAS } }
-
terminate()
protected void terminated() { }
此方法体为空,我们可在其子类中重写该方法,这样在tryTerminated()方法中,可以执行我们自定义的该 方法;
源码解析线程池如何实现优雅退出
shutdown()
使用线程池的时候,当调用了shutdown()方法时,线程池将不再接收新提交任务,已经运行中的线程将继续执行;
此方法是非阻塞方法,调用后会立即返回,并不会等待所有线程任务执行完成才返回;
public void shutdown() {
//获取线程池的全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否有关闭线程池的权限
checkShutdownAccess();
//将当前线程池的状态设置为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断woker线程
interruptIdleWorkers();
//调用ScheduledThreadPoolExecutor的钩子函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
//释放锁
mainLock.unlock();
}
tryTerminate();
}
shutdownNow()
如果调用了线程池的shutdownNow()方法,那么线程次将不再接收新提交的任务,workQueue队列中的线程也将被丢弃,运行中的线程将被中断;方法会立即返回,返回结果为任务队列workQueue中被丢弃的任务列表
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
//清空、丢弃队列中任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
//返回任务列表
return tasks;
}
awaitTermination(long timeout, TimeUnit unit)
当线程池调用awaitTermination时,会阻塞调用者所在线程,直到线程池状态变为TERNINATED才返回,或者到达了超时时间返回
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
//释放锁
mainLock.unlock();
}
}
此方法总体逻辑为:先获取Worker线程的独占锁,然后自旋,并判断线程池状态变为TERMINATED;如果是,则返回true,否则检测是否超时,若超时,则返回false;未超时则重置超时剩余时长;
AQS中的关键类
CountDownLatch
- 概述
同步辅助类,它可以阻塞当前线程的执行。也就是可以实现一个或多个线程一直等待,直到其他线程执行完毕。使用一个给定的计数器进行初始化,该计数器的操作是原子操作,即同一时刻只有一个线程可操作计数器。
调用改类的await()方法的线程会一直等待,直到其他线程调用该类的countDown()方法,并使当前计数器的值为0为止;
每次调用该类的countDown()方法,都会让改计数器值减一;
当该计数器的值减小为0时,所有因调用await()方法而阻塞等待的线程,都将继续往下执行;这种操作只能出现一次,因为该计数器的值不能重置;
如果需要一个可以重置计数次数的版本,可以考虑使用CyclicBarrier.
CountDownLatch支持给定超时时间的等待,超过给定时间不在等待;使用时只需在await()方法中传入给定时间即可;
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
-
使用场景
在某些场景中,程序需要等待某个或某些条件完成后,才能继续执行后续的操作。典型的应用为并行计算:当处理某个运算量很大的任务时,可拆分为多个小任务,等待所有子任务完后,父任务再拿到所有子任务的结果进行汇总
-
代码示例
调用ExecutorService的shutdown()方法,并不会第一时间将线程全都销毁掉,而是让当前已有线程全部执行完,
之后再把线程池销毁掉
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CountDownLatchExample { private static final int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws InterruptedException { Thread.sleep(100); log.info("{}", threadNum); } }
支持给定时间等待的代码如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Slf4j public class CountDownLatchExample { private static final int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(10, TimeUnit.MICROSECONDS); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws InterruptedException { Thread.sleep(100); log.info("{}", threadNum); } }
Semaphore
-
概述
控制同一时间并发线程的数目。能够完成对于信号量的控制,可控制某个资源被同时访问的线程个数。
提供了两个核心方法:acquire()和release()
acquire()表示获取一个访问许可,没有获得则阻塞等待;release()则是在完成后释放一个许可。
Semaphore维护了当前可访问的个数;通过同步机制来控制可同时访问的个数。
Semaphore可以实现有限大小的链表
-
使用场景
Semaphore常用于可有限访问的资源
-
代码示例
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j public class SemaphoreExample { private static final int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(); //获取一个许可 test(threadNum); semaphore.release(); //释放一个许可 } catch (InterruptedException e) { e.printStackTrace(); } }); } exec.shutdown(); } private static void test(int threadNum) throws InterruptedException { log.info("{}", threadNum); Thread.sleep(1000); } }
每次获取并释放多个许可
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j public class SemaphoreExample { private static final int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(3); //获取多个许可 test(threadNum); semaphore.release(3); //释放多个许可 } catch (InterruptedException e) { e.printStackTrace(); } }); } log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws InterruptedException { log.info("{}", threadNum); Thread.sleep(1000); } }
假设有这样一个场景,假设系统当前允许的最大并发数是3,超过3之后就需要丢弃,这样的场景也可通过Semaphore来实现:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j public class SemaphoreExample { private static final int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { //尝试获取一个许可,也可以尝试获取多个许可, //支持尝试获取许可超时设置,超时后不再等待后续线程的执行 //具体可以参见Semaphore的源码 if (semaphore.tryAcquire()) { test(threadNum); semaphore.release(); //释放一个许可 } } catch (InterruptedException e) { e.printStackTrace(); } }); } log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws InterruptedException { log.info("{}", threadNum); Thread.sleep(1000); } }
CyclicBarrier
-
概述
是一个同步辅助类,允许一组线程相互等待,直到到达某个公共的屏障点;通过它可以完成多个线程之间相互等待,只有每个线程都准备就绪后,才允许各自继续往下执行。
与countDownLatch相似的地方,都是使用计数器实现,当某个线程调用了CyclicBarrier的await()方法后,就进入了等待状态,而且计数器执行加一操作;当计数器的值增加到设置的初始值,所有因await()方法进入等待状态的线程将被唤醒,继续执行各自后续的操作。CyclicBarrier在释放等待线程后可以重用,因此CyclicBarrier又被成为循环屏障
-
使用场景
可以用于多线程计算数据,最后合并计算结果的场景
-
和countDownLatch的区别
- countDownLatch的计数器只能使用一次;而CyclicBarrier的计数器可使用reSet()重置,循环使用
- countDownLatch实现的是1个或n个线程等待其他线程完成后,才能继续往下执行,描述的是1个或多个线程等待其他线程的关系;而CyclicBarrier主要是一个线程组内线程相互等待,知道每个线程都满足了共有条件,才继续往下执行,描述的多个线程内部相互等待的关系。
- CyclicBarrier可以处理更复杂的场景,当计算出错时,可以重置计数器,让线程重新再执行一次
- CyclicBarrier提供了更多有用的方法,比如getNumberByWaiting()可获取等待线程的数量,可通过isBroken()方法判断线程是否被中断
-
代码示例
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CyclicBarrierExample { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executorService.execute(() -> { try { race(threadNum); } catch (Exception e) { e.printStackTrace(); } }); } executorService.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); cyclicBarrier.await(); log.info("{} continue", threadNum); } }
设置等待超时示例代码如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; @Slf4j public class CyclicBarrierExample { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executorService.execute(() -> { try { race(threadNum); } catch (Exception e) { e.printStackTrace(); } }); } executorService.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); try { cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); } catch (BrokenBarrierException | TimeoutException e) { log.warn("BarrierException", e); } log.info("{} continue", threadNum); } }
在声明CyclicBarrier的时候,还可以指定一个Runnable,当线程到达屏障的时候,可以优先执行Runnable的方法,示例代码如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CyclicBarrierExample { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { log.info("callback is running"); }); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executorService.execute(() -> { try { race(threadNum); } catch (Exception e) { e.printStackTrace(); } }); } executorService.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); cyclicBarrier.await(); log.info("{} continue", threadNum); } }
AQS中的关键锁
ReentrantLock
-
概述
Java中提供的锁主要分为两类,一类是Synchronized修饰的锁,另一类就是JUC中提供的锁,而JUC中的核心锁就是ReentrantLock
ReentrantLock和Synchronized的区别:
-
可重入性
二者都是同一个线程进入一次,锁的计数器就加一,当锁的计数器下降为0时,才会释放锁
-
锁的实现
Synchronized是基于JVM实现的;而ReentrantLock是基于JDK实现的
-
性能区别
Synchronized优化之前性能比ReentrantLock差很多;但JDK6之后,Synchronized引入偏向锁、轻量级锁(即自旋锁)之后,性能差不多了
-
功能区别
便利性:
Synchronized使用起来较方便,并且由编译器加锁和释放锁;ReentrantLock需要手动加锁和释放锁,最好是在finally中释放锁
灵活度和细粒度:
ReentrantLock 这里优于Synchronized
ReentrantLock独有的功能:
- ReentrantLock 可以指定公平锁还是非公平锁。而Synchronized只能使用非公平锁。公平锁的意思就是先等待的线程先获得锁
- 提供了一个Condition类,可以分组唤醒需要唤醒的线程。而Synchronized只能随机的唤醒一个线程,或者唤醒全部线程
- 提供能够中断等待锁的线程的机制,lock.lockInterruptily()。ReentrantLock 实现是一种自旋锁,通过调用CAS操作来实现加锁,性能比较好是因为避免了使线程进入内核态的阻塞状态
- 总的来说,Synchronized能做的,ReentrantLock 都能做。性能上,ReentrantLock 比Synchronized的要好
Synchronized的优势:
- 不用手动释放锁,JVM自动处理,如果出现异常,JVM也会自动释放锁
- JVM进行锁定管理请求和释放时,JVM在生成线程转储时能够锁定信息,这些信息对调试非常有用,因为他们能够标识死锁和其他异常行为来源。而ReentrantLock 只是普通的类,JVM不知道哪个线程拥有锁
- Synchronized可以再所有版本的JVM上使用,ReentrantLock 在某些1.5版本之前的JVM可能不支持
ReentrantLock中部分方法说明:
- boolean tryLock():仅在调用时锁定未被另一个线程保持的情况下才获取锁定
- boolean tryLock(long timeout, TimeUnit unit):如果锁定在给定时间没有被另一个线程保持,且当前线程没有中断,则获取这个锁定
- void lockInterruptibly():若当前线程没有被中断,就获取锁定;若被中断,则抛出异常
- boolean isLocked():查询此锁定是否由任意线程所保持
- boolean isHeldByCurrentThread():查询当前线程是否保持锁定状态
- boolean isFair():判断是否是公平锁
- boolean hasQueuedThread(Thread thread):查询指定线程是否在等待获取此锁定
- boolean hasQueuedThreads():是否有线程在等待获取此锁定
- int getHoldCount():查询当前线程保持锁定的个数
-
-
代码示例
package io.binghe.concurrency.example.lock; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @Slf4j public class LockExample { //请求总数 public static int clientTotal = 5000; //同时并发执行的线程数 public static int threadTotal = 200; public static int count = 0; private static final Lock lock = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private static void add() { lock.lock(); try { count++; } finally { lock.unlock(); } } }
ReentrantReadWriteLock
-
概述
读写锁,在没有任何读锁的时候,才可以获取写锁;若一直无法获得写锁,就会导致写锁饥饿;
在读多写少的场景,ReentrantReadWriteLock性能比ReentrantLock高不少,在多线程读时互不影响,不像ReentrantLock即使是多线程读,也需要每个线程获取读锁;不过任何一个线程在写的时候,就和ReentrantLock类似,无论其他线程是读还是写,都必选获取写锁。需要注意的是同一个线程,可以同时持有读锁和写锁。
-
代码示例
package io.binghe.concurrency.example.lock; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @Slf4j public class LockExample { private final Map<String, Data> map = new TreeMap<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); public Data get(String key) { readLock.lock(); try { return map.get(key); } finally { readLock.unlock(); } } public Set<String> getAllKeys() { readLock.lock(); try { return map.keySet(); } finally { readLock.unlock(); } } public Data put(String key, Data value) { writeLock.lock(); try { return map.put(key, value); } finally { writeLock.unlock(); } } class Data { } }
StampedLock
- 概述
StampedLock是ReentrantReadWriteLock的实现,主要不同是StampedLock不允许重入,多了乐观读的功能,使用上会更加复杂一些,但是会有更好的性能表现。
StampedLock控制锁的三种模式:读、写、乐观读
StampedLock的状态由版本和模式两个部分组成,锁获取方法返回的是一个数字所谓票据,用相应的锁状态来表示并控制相关的访问,数字0表示写锁没有被授权访问
在读锁上分为悲观锁和乐观锁,乐观读就是在读多写少的场景,乐观的认为写入和读取同时发生的几率很小,因此,乐观的完全用读锁锁定。程序可以查看读取之后,是否遭到写入进行了变更,再采取后续的措施,这样的改进可大幅提升程序的吞吐量
总之,在读线程越来越多的场景下,StampedLock大幅提升了程序的吞吐量
import java.util.concurrent.locks.StampedLock;
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
//下面看看乐观读锁案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
double currentX = x, currentY = y; //将两个字段读入本地局部变量
if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
try {
currentX = x; // 将两个字段读入本地局部变量
currentY = y; // 将两个字段读入本地局部变量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
//下面是悲观读锁案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
if (ws != 0L) { //这是确认转为写锁是否成功
stamp = ws; //如果成功 替换票据
x = newX; //进行状态改变
y = newY; //进行状态改变
break;
} else { //如果不能成功转换为写锁
sl.unlockRead(stamp); //我们显式释放读锁
stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
}
}
} finally {
sl.unlock(stamp); //释放读锁或写锁
}
}
}
-
代码示例
package io.binghe.concurrency.example.lock; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.StampedLock; @Slf4j public class LockExample { //请求总数 public static int clientTotal = 5000; //同时并发执行的线程数 public static int threadTotal = 200; public static int count = 0; private static final StampedLock lock = new StampedLock(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private static void add() { //加锁时返回一个long类型的票据 long stamp = lock.writeLock(); try { count++; } finally { //释放锁的时候带上加锁时返回的票据 lock.unlock(stamp); } } }
我们可以这样初步判断选择Synchronized还是ReentrantLock:
- 当只有少量竞争者时,Synchronized是一个很好的通用锁实现
- 竞争者不少,但是线程的增长趋势是可预估的,此时使用ReentrantLock是个很好的通用锁实现
- Synchronized不会引发死锁,其他锁使用不当可能会引发死锁
Condition
-
概述
Condition是一个多线程间协调通讯的工具类,使用它可以有更好的灵活性,比如可以实现多路通知功能,也就是一个Lock对象里,可以创建多个Condition实例,线程对象可以注册在指定的Condition中,从而选择性的进行线程通知,在调度线程上更加灵活
-
特点
- Condition的前提是Lock,由AQS中的newCondition()方法来创建Condition对象
- Condition的await()方法表示线程从AQS中删除,并释放线程获取的锁;并进入Condition等待队列,等待被通知
- Condition的signal()方法表示唤醒Condition等待队列中的节点,准备获取锁
-
代码示例
package io.binghe.concurrency.example.lock; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @Slf4j public class LockExample { public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(); Condition condition = reentrantLock.newCondition(); new Thread(() -> { try { reentrantLock.lock(); log.info("wait signal"); // 1 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("get signal"); // 4 reentrantLock.unlock(); }).start(); new Thread(() -> { reentrantLock.lock(); log.info("get lock"); // 2 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } condition.signalAll(); log.info("send signal ~ "); // 3 reentrantLock.unlock(); }).start(); } }
ThreadLocal
- 概述
ThreadLocal是JDK提供的,支持线程本地变量。意思是ThreadLocal中存放的变量属于当前线程,该变量对其他线程而言是隔离的,也就是说该变量是当前线程独有的变量。
如果我们创建了一个ThreadLocal变量,则访问这个变量的每个线程都会拥有这个变量的本地副本,当多个线程对这个变量进行操作时,实际上是操作的该变量的本地副本,从而避免了线程安全问题
-
使用示例
使用ThreadLocal保存并打印相关变量信息
public class ThreadLocalTest { private static ThreadLocal<String> threadLocal = new ThreadLocal<String>(); public static void main(String[] args) { //创建第一个线程 Thread threadA = new Thread(() -> { threadLocal.set("ThreadA:" + Thread.currentThread().getName()); System.out.println("线程A本地变量中的值为:" + threadLocal.get()); }); //创建第二个线程 Thread threadB = new Thread(() -> { threadLocal.set("ThreadB:" + Thread.currentThread().getName()); System.out.println("线程B本地变量中的值为:" + threadLocal.get()); }); //启动线程A和线程B threadA.start(); threadB.start(); } }
运行程序,打印信息如下:
线程A本地变量中的值为:ThreadA:Thread-0 线程B本地变量中的值为:ThreadB:Thread-1
此时,我们为线程A增加删除变量操作:
public class ThreadLocalTest { private static ThreadLocal<String> threadLocal = new ThreadLocal<String>(); public static void main(String[] args) { //创建第一个线程 Thread threadA = new Thread(() -> { threadLocal.set("ThreadA:" + Thread.currentThread().getName()); System.out.println("线程A本地变量中的值为:" + threadLocal.get()); threadLocal.remove(); System.out.println("线程A删除本地变量后ThreadLocal中的值为:" + threadLocal.get()); }); //创建第二个线程 Thread threadB = new Thread(() -> { threadLocal.set("ThreadB:" + Thread.currentThread().getName()); System.out.println("线程B本地变量中的值为:" + threadLocal.get()); System.out.println("线程B没有删除本地变量:" + threadLocal.get()); }); //启动线程A和线程B threadA.start(); threadB.start(); } }
打印信息如下:
线程A本地变量中的值为:ThreadA:Thread-0 线程B本地变量中的值为:ThreadB:Thread-1 线程B没有删除本地变量:ThreadB:Thread-1 线程A删除本地变量后ThreadLocal中的值为:null
通过上述程序,我们可以看出:线程A和线程B存储在ThreadLocal中的变量互不干扰,线程A存储的变量只能线程A可以访问,线程B存储的变量只能由线程B访问
-
ThreadLocal原理
public class Thread implements Runnable { /***********省略N行代码*************/ ThreadLocal.ThreadLocalMap threadLocals = null; ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; /***********省略N行代码*************/ }
由以上源码可看出,ThreadLocal类中存在threadLocals和inheritableThreadLocals,这两个变量都是ThreadLocalMap类型的变量,而且二者的初始值都为null,只有当前线程第一次调用set()方法或get()方法时才会实例化变量。
须注意的时,每个线程的本地变量不是存放在ThreadLocal实例里面的,而是存放在调用线程的threadLocals变量里面的;也就是说,调用ThreadLocal的set()方法,存储的本地变量是存放在具体调用线程的内存空间中的,而ThreadLocal只提供了get()和set()方法来访问本地变量值;当调用set()方法时,将要设置的值存储在调用线程的threadLocals中,当调用get()方法来取值时,将从当前线程获取threadLocals中存放的变量;
-
set()
public void set(T value) { //获取当前线程 Thread t = Thread.currentThread(); //以当前线程为key,获取ThreadLocalMap对象 ThreadLocalMap map = getMap(t); if (map != null) //获取的获取ThreadLocalMap不为空,则赋值操作 map.set(this, value); else //获取ThreadLocalMap为空,则为当前线程创建并赋值 createMap(t, value); } /** * Create the map associated with a ThreadLocal. Overridden in * InheritableThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the map */ void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
-
get()
/** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */ public T get() { //获取当前线程 Thread t = Thread.currentThread(); //获取当前线程ThreadLocalMap ThreadLocalMap map = getMap(t); if (map != null) { //ThreadLocalMap不为空,则从中取值 ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } //ThreadLocalMap为空,则返回默认值 return setInitialValue(); }
-
remove()
public void remove() { //获取当前线程中的threadLocals ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) //若threadLocals不为空,则清除 m.remove(this); }
注意:若线程一直不终止,则本地变量会一直存在于调用线程的ThreadLocal中;所以,如果不需要本地变量时,可以调用ThreadLocal的remove()方法删除,以免出现内存溢出问题
-
-
ThreadLocal变量不具传递性
使用ThreadLocal存储的本地变量,不具有传递性;也就是说同一个ThreadLocal,在父线程中设置值后,在子线程中是获取不到值的;
public class ThreadLocalTest { private static ThreadLocal<String> threadLocal = new ThreadLocal<String>(); public static void main(String[] args) { //在主线程中设置值 threadLocal.set("ThreadLocalTest"); //在子线程中获取值 Thread thread = new Thread(new Runnable() { @Override public void run() { System.out.println("子线程获取值:" + threadLocal.get()); } }); //启动子线程 thread.start(); //在主线程中获取值 System.out.println("主线程获取值:" + threadLocal.get()); } }
运行上述字段代码之后,结果如下所示:
主线程获取值:ThreadLocalTest 子线程获取值:null
通过上述示例可以看出,在父线程中对ThreadLocal进行设置值之后,在子线程中是取不到这个值的;
那有没有什么办法,可以在子线程中取到父线程的值呢?我们可以通过InheritableThreadLocal来实现;
-
InheritableThreadLocal
InheritableThreadLocal类继承自ThreadLocal,它可以实现在子线程中获取到父线程中设置的值;
public class ThreadLocalTest { private static InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<String>(); public static void main(String[] args) { //在主线程中设置值 threadLocal.set("ThreadLocalTest"); //在子线程中获取值 Thread thread = new Thread(new Runnable() { @Override public void run() { System.out.println("子线程获取值:" + threadLocal.get()); } }); //启动子线程 thread.start(); //在主线程中获取值 System.out.println("主线程获取值:" + threadLocal.get()); } }
运行上述程序,结果如下:
主线程获取值:ThreadLocalTest 子线程获取值:ThreadLocalTest
可以看出,使用InheritableThreadLocal,子线程可以取到父线程中设置的本地变量;
-
InheritableThreadLocal原理
public class InheritableThreadLocal<T> extends ThreadLocal<T> { protected T childValue(T parentValue) { return parentValue; } /** * Get the map associated with a ThreadLocal. * * @param t the current thread */ ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } /** * Create the map associated with a ThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the table. */ void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } }
由源码可知,InheritableThreadLocal继承自ThreadLocal,并重写了childValue()/getMap()/createMap()方法,
也就是说,当调用ThreadLocal的set()方法时,创建的是当前线程的inheritableThreadLocals变量,而不是threadLocals变量;此时如果父线程创建子线程,在Thread类的构造函数中,会把父线程的inheritableThreadLocals变量中的本地变量复制一份到子线程的inheritableThreadLocals中。
并发问题
可见性问题
可见性问题,即一个线程对共享变量进行了修改,另一线程不能立刻见到这种修改,这是由CPU添加了缓存导致的;
单核CPU不存在可见性问题,多核CPU才存在可见性问题;单核CPU由于是单核时间片调度,实际是串行执行;而多核则可以实现并行;
可见性示例代码:
public class ConcurrentTest {
private int num = 0;
public static void main(String[] args) throws InterruptedException {
ConcurrentTest concurrentTest = new ConcurrentTest();
concurrentTest.threadsTest();
}
public void threadsTest() throws InterruptedException {
Thread thread = new Thread("test-1") {
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
addNum();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread thread2 = new Thread("test-2") {
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
addNum();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
thread.start();
thread2.start();
thread.join();
thread2.join();
System.out.println("执行完毕");
}
private void addNum() throws InterruptedException {
Thread.sleep(1000);
num++;
System.out.println(Thread.currentThread().getName() + ":" + num);
}
}
原子性问题
原子性是指一个或多个操作,在CPU执行过程中不被中断的特性。原子性操作一旦开始运行,就会一直到运行结束为止,中间不会有中断的情况发生。
原子性问题,是指一个或多个操作,在CPU执行过程中,出现了中断的情况;
线程在执行某项操作时,此时由于CPU切换,转而去执行其他任务,导致当前任务中断,这就会造成原子性问题;
在JAVA中,并发程序是基于多线程来编写的,这也会涉及到CPU对于线程的切换问题,正是由于CPU对线程的切换,导致并发编程可能出现原子性问题;
有序性问题
有序性是指:代码按执行顺序执行
指令重排序:编译器或解释器为了优化程序性能,有时会修改程序的执行顺序,但这种修改,可能会导致意想不到的问题;
在单线程情况下,指令重排序仍可保证最终结果与程序顺序执行结果一致,但在多线程情况下,可能就会存在问题;
有序性问题:CPU为了对程序进行优化,进行指令重排序,此时执行顺序可能和编码顺序不一致,这可能会引起有序性问题;
总结
导致并发编程可能导致问题的主要原因有三个:缓存导致的可见性问题,CPU线程切换导致的原子性问题和性能优化指令重排序导致的有序性问题
Happens-Before原则
JDK1.5版本中,JAVA内存模型引入了Happens-Before原则
示例代码1:
class VolatileExample {
int x = 0;
volatile boolean v = false;
public void writer() {
x = 1;
v = true;
}
public void reader() {
if (v == true) {
//x的值是多少呢?
}
}
}
-
程序次序规则
单个线程中,按照代码顺序,前面的操作Happens-Before于后面的任意操作
例如:示例1中,代码 x = 1会在 v = true 之前完成;
-
volatile变量规则
对一个volatile的写操作,Happens-Before于后续对它的读操作
-
传递规则
若A Happens-Before B,B Happens-Before C,则有 A Happens-Before C
结合原则1、2、3和示例代码1,我们可得出如下结论:
- x = 1 Happens-Before v = true ,符合原则1;
- 写变量 v = true Happens-Before 读变量 v = true,符合原则2
- 则根据原则3得出,x = 1 Happens-Before 读变量 v = true;
- 就是也就是说,如果线程B读取到了 v = true,那么线程A设置的 x = 1 对于B就是可见的,就是说此时的B线程可以访问到 x = 1
-
锁定规则
对一个锁的解锁操作Happens-Before于后续该锁的加锁操作
-
线程启动规则
若线程A调用线程B的start()启动线程B,则start()方法 Happens-Before 于线程B中的任意操作
//在线程A中初始化线程B Thread threadB = new Thread(()->{ //此处的变量x的值是多少呢?答案是100 }); //线程A在启动线程B之前将共享变量x的值修改为100 x = 100; //启动线程B threadB.start();
-
线程终结规则
线程A等待线程B完成(调用线程B的join()方法),当线程B完成后(线程B的join()方法返回),则线程A能够访问到线程B对共享变量的修改
Thread threadB = new Thread(()-{ //在线程B中,将共享变量x的值修改为100 x = 100; }); //在线程A中启动线程B threadB.start(); //在线程A中等待线程B执行完成 threadB.join(); //此处访问共享变量x的值为100
-
线程中断规则
对线程interrupt()方法的调用,Happens-Before于被中断线程检测到中断事件发生
-
对象终结规则
一个对象的初始化完后才Happens-Before于该对象finilize()方法的开始
ForkJoin框架
-
概述
Java1.7引入了一种新的并发框架——Fork/Join框架;主要用于实现“分而治之”的算法,特别是分而治之后递归调用的函数。
Fork/Join框架的本质是一个并行执行任务的框架,能够把一个大任务分割成多个小任务,最终汇总每个小任务的结果得到大任务的结果。Fork/Join框架与ThreadPool共存,并不是要替换ThreadPool
-
原理
Fork/Join使用无限队列来保存需要执行的任务,而线程数量则是通过构造函数传入,若没有传入线程数量,则当前计算机可用CPU数量会被默认设置为线程数量。
ForkjoinPool主要采用分治法来解决问题。典型应用比如快速排序算法。
-
框架实现
-
ForkJoinPool
实现了ForkJoin框架中的线程池
-
ForkJoinWorkerThread
实现ForkJoin框架中的线程
-
ForkJoinTask
封装了数据及其相应的运算,并且支持细粒度的数据并行。
ForkJoinTask主要包括两个方法fork()和join(),分别实现任务的分拆和合并;
fork()方法类似于Thread方法中的start()方法,但是它并不立即执行任务,而是将任务放入执行队列中。
join()方法和Thread的join()方法不同,它并不简单的阻塞线程;而是利用工作线程执行其他任务,当一个工作线程调用join(),它将处理其他任务,直到注意到子线程执行完成。
-
RecursiveTask
有返回结果的ForkJoinTask实现Callable
-
RecursiveAction
无返回结果的ForkJoinTask实现Runnalbe
-
CountedCompleter
在任务执行完成后会触发执行一个自定义的钩子函数
-
-
示例代码
public class ForkJoinTaskExample extends RecursiveTask<Integer> { public static final int threshold = 2; private int start; private int end; public ForkJoinTaskExample(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //如果任务足够小就计算任务 boolean canCompute = (end - start) <= threshold; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任务大于阈值,就分裂成两个子任务计算 int middle = (start + end) / 2; ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end); // 执行子任务 leftTask.fork(); rightTask.fork(); // 等待任务执行结束合并其结果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合并子任务 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkjoinPool = new ForkJoinPool(); //生成一个计算任务,计算1+2+3+4 ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100); //执行一个任务 Future<Integer> result = forkjoinPool.submit(task); try { log.info("result:{}", result.get()); } catch (Exception e) { log.error("exception", e); } } }
v = true 之前完成;
-
volatile变量规则
对一个volatile的写操作,Happens-Before于后续对它的读操作
-
传递规则
若A Happens-Before B,B Happens-Before C,则有 A Happens-Before C
结合原则1、2、3和示例代码1,我们可得出如下结论:
- x = 1 Happens-Before v = true ,符合原则1;
- 写变量 v = true Happens-Before 读变量 v = true,符合原则2
- 则根据原则3得出,x = 1 Happens-Before 读变量 v = true;
- 就是也就是说,如果线程B读取到了 v = true,那么线程A设置的 x = 1 对于B就是可见的,就是说此时的B线程可以访问到 x = 1
-
锁定规则
对一个锁的解锁操作Happens-Before于后续该锁的加锁操作
-
线程启动规则
若线程A调用线程B的start()启动线程B,则start()方法 Happens-Before 于线程B中的任意操作
//在线程A中初始化线程B Thread threadB = new Thread(()->{ //此处的变量x的值是多少呢?答案是100 }); //线程A在启动线程B之前将共享变量x的值修改为100 x = 100; //启动线程B threadB.start();
-
线程终结规则
线程A等待线程B完成(调用线程B的join()方法),当线程B完成后(线程B的join()方法返回),则线程A能够访问到线程B对共享变量的修改
Thread threadB = new Thread(()-{ //在线程B中,将共享变量x的值修改为100 x = 100; }); //在线程A中启动线程B threadB.start(); //在线程A中等待线程B执行完成 threadB.join(); //此处访问共享变量x的值为100
-
线程中断规则
对线程interrupt()方法的调用,Happens-Before于被中断线程检测到中断事件发生
-
对象终结规则
一个对象的初始化完后才Happens-Before于该对象finilize()方法的开始
ForkJoin框架
-
概述
Java1.7引入了一种新的并发框架——Fork/Join框架;主要用于实现“分而治之”的算法,特别是分而治之后递归调用的函数。
Fork/Join框架的本质是一个并行执行任务的框架,能够把一个大任务分割成多个小任务,最终汇总每个小任务的结果得到大任务的结果。Fork/Join框架与ThreadPool共存,并不是要替换ThreadPool
-
原理
Fork/Join使用无限队列来保存需要执行的任务,而线程数量则是通过构造函数传入,若没有传入线程数量,则当前计算机可用CPU数量会被默认设置为线程数量。
ForkjoinPool主要采用分治法来解决问题。典型应用比如快速排序算法。
-
框架实现
-
ForkJoinPool
实现了ForkJoin框架中的线程池
-
ForkJoinWorkerThread
实现ForkJoin框架中的线程
-
ForkJoinTask
封装了数据及其相应的运算,并且支持细粒度的数据并行。
ForkJoinTask主要包括两个方法fork()和join(),分别实现任务的分拆和合并;
fork()方法类似于Thread方法中的start()方法,但是它并不立即执行任务,而是将任务放入执行队列中。
join()方法和Thread的join()方法不同,它并不简单的阻塞线程;而是利用工作线程执行其他任务,当一个工作线程调用join(),它将处理其他任务,直到注意到子线程执行完成。
-
RecursiveTask
有返回结果的ForkJoinTask实现Callable
-
RecursiveAction
无返回结果的ForkJoinTask实现Runnalbe
-
CountedCompleter
在任务执行完成后会触发执行一个自定义的钩子函数
-
-
示例代码
public class ForkJoinTaskExample extends RecursiveTask<Integer> { public static final int threshold = 2; private int start; private int end; public ForkJoinTaskExample(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //如果任务足够小就计算任务 boolean canCompute = (end - start) <= threshold; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任务大于阈值,就分裂成两个子任务计算 int middle = (start + end) / 2; ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end); // 执行子任务 leftTask.fork(); rightTask.fork(); // 等待任务执行结束合并其结果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合并子任务 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkjoinPool = new ForkJoinPool(); //生成一个计算任务,计算1+2+3+4 ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100); //执行一个任务 Future<Integer> result = forkjoinPool.submit(task); try { log.info("result:{}", result.get()); } catch (Exception e) { log.error("exception", e); } } }