一、Java的线程实现方式
- 继承Thread类创建线程。
- 实现Runnable创建线程。
- 线程池
- 实现callable接口通过FutureTask包装器来创建Thread线程。
- 使用ExcuterService、Callable、Future实现有返回结果的线程。
二、继承Thread类创建线程。
其实Thread类已经重写了runnable方法。
public class ThreadExc {
public static void main(String[] args){
MyThread mt = new MyThread();
// 调用start方法开启一个新的线程,
// 如果直接调用run方法,则线程mt不会进入就绪态
mt.start();
System.out.println("主线程结束");
}
}
// 继承Thread类,重写run方法
class MyThread extends Thread{
@Override
public void run(){
for(int i=0; i<10 ;i++){
System.out.println("我是"+i);
}
}
}
三、实现Runnable接口创建线程。
private static void threadyield(){
Runnable r = new Runnable() {
@Override
public void run() {
for(int i=0; i<10 ;i++){
System.out.println("我是"+Thread.currentThread().getName()+i);
}
}
};
Thread t1 = new Thread(r,"t1");
t1.start();
}
lambda表达式创建线程
public static void main(String[] args){
// 通过runnable接口来实现
Runnable ra = () -> {
for(int i=0;i<10;i++){
System.out.println("线程2"+i);
}
};
Thread th = new Thread(ra);
th.start();
}
线程的命名
private static void threadname(){
Thread mt = new Thread();
// 线程的命名
mt.setName("custom");
// 通过构造方法进行命名
// Thread(Runnable r, String name) 通过这个接口进行实例化
Thread mt = new Thread(() -> {},"custom");
}
线程的休眠
Thread.sleep(1000); // 单位ms
线程的优先级
private static void threadlv(){
// 设置优先级只是会修改线程抢到cpu的概率
// 并不是优先级高的一定能抢到 优先级是【0,10】的整数默认是5
Runnable r = () -> {
for(int i=0; i<10 ;i++){
System.out.println(Thread.currentThread().getName()+":" + i);
}
};
Thread t1 = new Thread(r, "t1");
Thread t2 = new Thread(r, "t2");
// 设置优先级
t1.setPriority(10);
t2.setPriority(1);
t1.start();
t2.start();
}
线程的礼让
private static void threadyield(){
// 线程礼让就是让当前运行的线程让出cpu回到就绪态,并不是礼让了一定就会不会再抢到
Runnable r = new Runnable() {
@Override
public void run() {
for(int i=0; i<10 ;i++){
System.out.println("我是"+Thread.currentThread().getName()+i);
if(i == 3){
Thread.yield();
}
}
}
};
Thread t1 = new Thread(r,"t1");
Thread t2 = new Thread(r,"t2");
t1.start();
t2.start();
}
线程的阻塞
public static void main(String[] args){
Runnable ra = () -> {
for(int i=0;i<10;i++){
System.out.println("线程2"+i);
}
};
Thread th = new Thread(ra);
th.start();
th.join(20); // 会阻塞主线程
System.out.println("主线程结束");
}
线程的同步
private static void threadlock(){
// 声明一把锁
ReentrantLock lock = new ReentrantLock();
Runnable r = () -> {
while(sell.number>0){
//加锁 同步代码段 对象锁 需要看到同意把锁 用synchronized修饰的方法就是同步法
//如果是静态方法加锁就是对象锁用类.class,如果是非静态方法加索就是this
synchronized (""){
System.out.println(Thread.currentThread().getName()+"售出剩余票数"+--sell.number);
}
// 上锁
lock.lock();
System.out.println(Thread.currentThread().getName()+"售出剩余票数"+--sell.number);
//解锁
lock.unlock();
}
};
Thread t1 = new Thread(r,"t1");
Thread t2 = new Thread(r,"t2");
t1.start();
t2.start();
}
TIPS:synchronized和ReentrantLock的区别?
1.synchronized是关键字属于JVM层面(底层用到的monitor对象,monitorenter代表进入同步代码块,monitorexit代表退出同步代码块),lock是具体类是api层面。
2.synchronized是不可中断的同时也是非公平锁,ReentrantLock是可中断,自定义是否是公平锁。
3..ReentrantLock功能性方面更全面,比如时间锁等候,可中断锁等候,锁投票等,因此更有扩展性。在多个条件变量和高度竞争锁的地方,用ReentrantLock更合适,ReentrantLock还提供了Condition,对线程的等待和唤醒等操作更加灵活,一个ReentrantLock可以有多个Condition实例,所以更有扩展性。
线程的死锁
private synchronized static void deadlock(){
// wait 等待 object方法 让当前的线程释放自己的锁和Cpu 让当前线程处于就绪态
// notify 通知 object方法 唤醒等待队列的线程
// notifyall 通知 object方法 唤醒等待队列所有的线程
Runnable r1 = new Runnable() {
@Override
public void run() {
synchronized ("A"){
System.out.println("拿到A");
try {
"A".wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized ("B"){
System.out.println("拿到A和B");
}
}
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
synchronized ("B"){
System.out.println("拿到B");
synchronized ("A"){
System.out.println("拿到A和B");
"A".notifyAll();
}
}
}
};
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
}
TIPS:为什么wait,notify,notifyall等方法都定义在object类?
因为这些方法在操作同步线程时,都必须要标识它们操作线程的锁,只有同一个锁上的被等待线程,可以被同一个锁上的notify唤醒,不可以对不同锁中的线程进行唤醒。也就是说,等待和唤醒必须是同一个锁。而锁可以是任意对象,所以可以被任意对象调用的方法是定义在object类中。
四、线程池
通过Executors对象也可以创建线程
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
// FixedThreadPool 固定线程数的线程池
// singleThreadExecutor 单线程的
// CachedThreadPool 带缓冲的
// 执行线程,也可以用submit方法
executorService.execute(() -> {
System.out.println("..");
});
// 关闭
executorService.shutdown();
}
然而在生产中不会使用上面的三种方式,上面的三种方式底层都是调用ThreadPoolExecutor进行创建。
线程池的工作流程:
当创建好线程池以后通过execute方法进行提交一个任务会先进行如下判断:
当前运行的线程数如果小于核心线程数那么就直接执行,否则就会放入到等待队列,当等待队列也满了以后就会创建线程一直到最大线程数(这时刚创建的线程会直接服务刚到的任务,并不会从阻塞队列中进行唤醒),当达到最大线程数以后如果还有线程到来,就会采取包和策略。
线程池的构造方法:
public ThreadPoolExecutor(
int corePoolSize, //核心线程数量
int maximumPoolSize, //最大线程数量
long keepAliveTime, //最大空闲时间
TimeUnit unit, //时间单位
BlockingQueue<Runnable> workQueue, //工作队列
ThreadFactory threadFactory, //线程工厂
RejectedExecutionHandler handler //饱和机制处理
)
工作队列不同会有不同的效果
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,1L,TimeUnit.SECONDS
,new LinkedBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
//如果用 new LinkedBlockingQueue<>() 当任务数量超过核心线程数时会将任务放到队列中,重复使用线程
//如果用 new SynchronousQueue<Runnable>() 当任务数量超过核心线程数时会继续开辟线程
//如果用 new SynchronousDeque<Runnable>() 不受最大限制的印象,但是当他有大小限制时就会受影响
excuter.execute(r);
线程池的饱和机制:
new ThreadPoolExecutor.AbortPolicy()当超出范围就会报错
new ThreadPoolExecutor.CallerRunsPolicy当超出范围就会交给主线程处理
new ThreadPoolExecutor.DiscardPolicy()新提交的任务被抛弃
new ThreadPoolExecutor.DiscardOldestPolicy()删除最老的元素
对于线程池中核心线程的数量该如何去配置:
IO密集型:
CPU核数*2
CPU核数/1-阻塞系数 阻塞系数0.8--0.9
CPU密集型:
CPU的核数+1
线程复用原理(核心线程和非核心线程的区别)
线程在线程池内部被封装成了Worker对象,其继承了AQS,所以具有锁的特性。线程池通过addWorker方法进行线程的添加,然后调用 runWorker 方法进行任务的执行。
final void runWorker(ThreadPoolExecutor.Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 不断的获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 进行退出线程
processWorkerExit(w, completedAbruptly);
}
}
执行完任务后不会退出线程的原因是因为有while循环,当通过getTask 无法获取到任务的时候就会进行退出线程,这里就是线程复用的主要原理。
private Runnable getTask() {
boolean timedOut = false; // 执行poll方法是否超时
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c); // 获取线程池中的数量
// 判断获取任务线程是否可以超时退出
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 非核心线程在这里进行线程的退出 并将线程池中的线程数减一
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 超时退出的原理,空闲一段时间就会退出
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
※:如果将 allowCoreThreadTimeOut 设置为true,那么所有线程都是可以进行超时退出的,包括核心线程。通过是否允许超时来决定用哪种方式从队列中获取任务。
线程池关闭方法 shutDown 和 shutDownNow 的区别
两者都会将线程池的状态修改为STOP,shutDown会继续执行阻塞队列中的任务,shutDownNow会将阻塞队列中的任务全部抛弃。
五、实现callable接口通过FutureTask包装器来创建Thread线程。
优点在于方法可以有返回值,并且可以抛出异常。
public class ThreadExc {
public static void main(String[] args){
MyThread mt = new MyThread();
// 执行Callable方式,需要FutureTask实现类的支持,用于接收运算结果
// 也可以通过result.isDone()查看是否计算完成
FutureTask<Integer> result = new FutureTask<>(mt);
new Thread(result).start();
Integer sum;
try {
// 获取值
sum = result.get();
System.out.println(sum);
}catch (Exception e){
e.printStackTrace();
}
}
}
// 实现Callable,需要添加返回类型
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception{
Integer sum = 0;
for(int i=0;i<10;i++){
sum += 1;
}
return sum;
}
}
六、使用ExcuterService、Callable、Future实现有返回结果的线程。
public static void main(String[] args){
// 创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(10);
// 返回结果
FutureTask result = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception{
Integer sum = 0;
for(int i=0;i<10;i++){
System.out.println(sum);
sum += 1;
}
return sum;
}
});
// 执行
pool.submit(result);
// 关闭
pool.shutdown();
}
七、通过 CompletableFuture 使用多线程
// CompletableFuture 实现了Future、CompletionStage,比原来的Future更强大。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}
CompletableFuture 解决了Future接口的局限性,即规避了Future的get()方法的阻塞性,以及减少浪费CPU资源。
使用CompletableFuture
// 一般不推荐直接使用new的方式去创建CompletableFuture ,一般通过静态方法创建
// 有返回值的,如果没有传线程池默认使用ForkJoinPool.commonPool 这个线程池是个守护线程池。
CompletableFuture<String> value2 = CompletableFuture.supplyAsync(() -> {}, executor);
CompletableFuture<String> value2 = CompletableFuture.supplyAsync(() -> {});
// 无返回值
CompletableFuture<String> value2 = CompletableFuture.runAsync(() -> {}, executor);
CompletableFuture<String> value2 = CompletableFuture.runAsync(() -> {});
CompletableFuture API 可以分为以下几大类。
1、获取结果以及主动触发计算
CompletableFuture<String> value2 = CompletableFuture.supplyAsync(() -> { return "ABC"; });
value2.join(); // 获取执行结果 有异常join会返回unchecked异常
value2.get(); // 获取执行结果 有异常get会返回一个具体的异常
value2.getNow("abc"); // 如果计算完成返回结果值,如果没有则返回给定的值
// 方法会返回布尔值,如果计算完成返回false,未完成返回true 并将参数值作为 get或join方法的返回值。
boolean bba = value2.complete("bba");
2、对计算结果进行链式处理
// 计算结果存在依赖关系,这两个线程串行化,某个步骤有异常不会继续向下执行。
CompletableFuture<Integer> value2 = CompletableFuture.supplyAsync(() -> {
return 1;
}, executor).thenApply(f -> {
return f + 2;
}).thenApply(f -> {
return f + 3;
});
// 计算结果存在依赖关系,这两个线程串行化,某个步骤有异常会带着异常继续执行。
CompletableFuture<Integer> value2 = CompletableFuture.supplyAsync(() -> {
return 1;
}, executor).handle((value, e) -> {
return value + 2;
}).handle((value, e) -> {
return value + 3;
});
3、对计算结果进行消费,无返回结果。
// 最终结果的消费,只消费,无返回,有点像流式编程的终端操作。
CompletableFuture.supplyAsync(() -> {
return 1;
}, executor).thenAccept(result ->{
System.out.println(result);
});
tips:对比 accept、run、apply的区别
thenRun() B不需要A的执行结果 且无返回值
thenAccept() B需要A的执行结果 且无返回值
thenApply() B需要A的执行结果 有返回值
4、计算结果的合并
// 将两个 CompletableFuture 任务的结果进行合并,combine需要指定合并规则
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
return 1;
});
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
return 2;
});
CompletableFuture<Integer> result = task1.thenCombine(task2, (x, y) -> {
// 合并规则
return x + y;
});
5、完成时回调、捕获异常
// whenComplete 当执行完成后执行回调函数,exceptionally捕获异常
CompletableFuture<String> value2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return "xxxx";
}, executor).whenComplete((v, e) -> {
if(e == null){
System.out.println(v + "计算完成");
}
}).exceptionally(e -> {
e.getCause();
return null;
});
6、以 Async 结尾的方法
有许多以 Async 结尾的方法,这样的方法都会新启一个线程,如果没有传线程池会使用默认的,会影响链式后续的方法的线程池指定。
八、线程中断
在Java中只有当前线程才能打断自己,可以通过 volatile 、原子类来进行变量的公用。
Thread.interrupted(); // 做了两件事 1.返回线程的中断状态 2 将线程中断状态清零并设置为false
Thread.currentThread().interrupt(); // 将线程中断状态设为true,不会立刻停止当前线程
Thread.currentThread().isInterrupted(); // 返回中断标志位状态,某个结束了的线程也会返回为false
使用interrupt
// A线程一直监听 ,B线程打断A线程
public static void main(String[] args) {
Thread a = new Thread(() -> {
while (true){
if(Thread.currentThread().isInterrupted()){
System.out.println("关掉了");
break;
}
System.out.println("水烧开了快关掉");
}
}, "a");
a.start();
new Thread(a::interrupt, "b").start();
}
PS:如果A线程程处于被阻塞状态(例如调用join sleep wait方法), B线程打断了A线程,此时A线程会抛出 InterruptedException,并将A线程的中断状态设置为false(程序会一直执行下去)