需要了解线程池基础的移步:
线程池总结
手写的线程池比较简陋,只有运行状态以及停止状态,采用内部有些属性采用了Atomi包里面的类来实现,更加方便。
方法说明:
- execute:提交任务方法,先判断核心线程是否达到配置的,如果达到配置的核心线程池配置,那么将任务提交到阻塞队列中,如果阻塞队列已经满了就创建工作线程,工作线程达到了最大线程数那么采用配置的策略进行处理。
- addWorker:在创建前先对线程池状态以及工作线程线程数进行判断;将任务封装成Worker类,Worker内部类,对任务进行封装,并且创建一个线程
- Worker类:继承了ReentrantLock 和 Runable,run方法去执行了线程池中定义的runWorker()方法,将自己传入。
- getTask:当前方法会去循环的取出队列中的任务,如果取的任务为空,就直接跳出while循环
package com.zhj.executors.myexecutor;
import com.zhj.ConcurrentApplication;
import javafx.concurrent.Worker;
import org.springframework.util.Assert;
import java.util.HashSet;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* ┏┓ ┏┓+ +
* ┏┛┻━━━┛┻┓ + +
* ┃ ┃
* ┃ ━ ┃ ++ + + +
* ████━████ ┃+
* ┃ ┃ +
* ┃ ┻ ┃
* ┃ ┃ + +
* ┗━┓ ┏━┛
* ┃ ┃
* ┃ ┃ + + + +
* ┃ ┃ God beast body, code no BUG
* ┃ ┃ + 神兽护体,代码无BUG
* ┃ ┃
* ┃ ┃ +
* ┃ ┗━━━┓ + +
* ┃ ┣┓
* ┃ ┏┛
* ┗┓┓┏━┳┓┏┛ + + + +
* ┃┫┫ ┃┫┫
* ┗┻┛ ┗┻┛+ + + +
*
* @title: 自定义线程池
* @author zhonghaijun
* @date 2021-03-02
*/
public class ZThreadPoolExecutor {
/**
* 核心线程数
*/
private Integer coreWorker;
/**
* 最大线程数
*/
private Integer maxWorker;
/**
* 工作线程数
*/
private AtomicInteger workerCount;
/**
* 排它锁
*/
private ReentrantLock mainLock = new ReentrantLock();
/**
* 线程池状态
*/
private AtomicBoolean isShutDown = new AtomicBoolean(false);
/**
* 任务队列
*/
private BlockingQueue<Runnable> taskQueue;
/**
* 线程池
*/
private HashSet<Worker> workSet;
/**
* 计算线程完成的数
*/
private long completedTaskCount;
/**
* 是否允许核心线程超时,如果为true,那么线程池最后会收缩至0
*/
private boolean allowCoreThreadTimeOut = false;
/**
* 设置超时时间
*/
private long timeOut;
/**
* 线程池拒绝策略
*/
private MyRejectHandler handler;
public ZThreadPoolExecutor(Integer coreWorker,Integer maxWorker,long timeOut, BlockingQueue<Runnable> taskQueue, MyRejectHandler handler) {
this.coreWorker = coreWorker;
this.maxWorker = maxWorker;
this.workerCount = new AtomicInteger(0);
this.timeOut = timeOut;
this.taskQueue = taskQueue;
this.workSet = new HashSet<>();
this.handler = handler;
}
/**
* 提交线程任务
* @param runnable
*/
public void execute(Runnable runnable){
Assert.notNull(runnable,"任务不能为空");
//比较核心线程是否创建完毕
int workerCount = this.workerCount.get();
if(workerCount < coreWorker){
//创建核心线程
addWorker(runnable,true);
return;
}
boolean status = this.isShutDown.get();
//状态为运行中,并且添加到等待队列成功
if(!status && this.taskQueue.offer(runnable)){
System.out.println("任务入队。。");
status = this.isShutDown.get();
//如果状态是已经关闭,移除任务
if(status && this.taskQueue.remove(runnable)){
//调用拒绝策略
handler.rejectHandler(runnable,this);
}
//如果获取到的工作线程数为0,那么就创建一个默认的
workerCount = this.workerCount.get();
if(workerCount == 0){
addWorker(null,false);
}
}else if(!addWorker(runnable,false)){
//添加失败调用拒绝策略
handler.rejectHandler(runnable,this);
}
}
private Boolean addWorker(Runnable firstTask,Boolean core){
//retry 使用break retry跳过当前标记下继续循环,continue retry表示跳到当前标记下继续循环
retry:
for (;;){
boolean status = isShutDown.get();
if(status){
//判断运行状态
System.out.println("线程池已经关闭,提交任务失败。。");
return false;
}
for (;;){
int workerCount = this.workerCount.get();
//每次添加worker都要判断工作线程数是否要大于核心线程数或者最大线程数
//如果大于核心线程数,返回false,重新创建普通线程;如果大于最大线程数,返回false调用拒绝策略
if(workerCount >= (core ? coreWorker:maxWorker)){
return false;
}
//比较交换,将工作线程数加一
if(this.workerCount.compareAndSet(workerCount,workerCount+1)){
System.out.println("工作线程数:"+this.workerCount.get());
break retry;
}
if(isShutDown.get() != status){
continue retry;
}
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker worker = null;
try {
worker = new Worker(firstTask);
//获取到工作线程本身
Thread thread = worker.thread;
if(thread != null){
try {
//将创建的Worker添加到工作队列中,涉及到对数组的操作,需要加锁
this.mainLock.lock();
boolean status = this.isShutDown.get();
//判断状态是否已经关闭
if(!status){
//判断当前线程是否已经启动,如果启动了就抛出异常
if(thread.isAlive()){
throw new IllegalAccessException(thread.getName()+"线程已经启动,不能再次启动");
}
//添加到工作队列中
workSet.add(worker);
workerAdded = true;
}
} catch (IllegalAccessException e) {
e.printStackTrace();
} finally {
this.mainLock.unlock();
}
//如果被添加成功了就启动线程
if(workerAdded){
//启动工作线程
System.out.println("启动工作线程。。");
thread.start();
workerStarted = true;
}
}
} finally {
if(!workerAdded){
System.out.println("工作线程添加失败");
addWorkerFailed(worker);
}
}
return workerStarted;
}
/**
* 添加工作队列失败,删除掉,并且将工作线程数减一
* @param worker
*/
private void addWorkerFailed(Worker worker){
try {
this.mainLock.lock();
if(worker != null){
this.workSet.remove(worker);
//执行比较交换,减去工作线程
doDecrementWorkerCount();
}
} finally {
this.mainLock.unlock();
}
}
/**
* 真正worker执行的方法
* @param worker
*/
private void runWorker(Worker worker) {
Thread thread = Thread.currentThread();
//获取到第一次的任务
Runnable firstTask = worker.firstTask;
//清空
worker.firstTask = null;
boolean completedAbruptly = true;
try {
//先判断第一次触发的任务是否为空,如果为空则去队列中获取任务,当前是while循环会一直去任务队列中获取数据
while (firstTask != null || (firstTask = getTask()) != null){
worker.lock();
boolean status = this.isShutDown.get();
//判断线程池是否已经关闭,并且线程是否已经中断,如果未中断就调用interrupt设置中断标记
//isInterrupted()判断当前线程是否被标记为中断,但是不会清楚标记;interrupted()判断是否标记为中断,并且会清除标记
if(status && !thread.isInterrupted()){
System.out.println("线程池获取失败,中断任务。。");
thread.interrupt();
}
try {
//直接调用task任务的run方法执行
firstTask.run();
} catch (Exception e){
throw new RuntimeException(e);
} finally {
//清空工作线程中的任务
firstTask = null;
worker.finishTaskNum ++;
worker.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(worker,completedAbruptly);
}
}
private void processWorkerExit(Worker worker, Boolean completedAbruptly) {
//如果任务不是正常的退出,减去工作线程
if(completedAbruptly){
doDecrementWorkerCount();
}
this.mainLock.lock();
try {
//线程池记录总共完成的任务数量
completedTaskCount += worker.finishTaskNum;
//移除掉worker
workSet.remove(worker);
} finally {
this.mainLock.unlock();
}
//如果线程池没有关闭,并且队列不为空时,创建新的工作线程
boolean status = this.isShutDown.get();
if(status){
System.out.println("线程池已经关闭,停止创建工作线程,工作线程数:"+this.workerCount.get());
}
if(!status){
if(!completedAbruptly){
//如果允许核心线程超时,那么线程池存在的线程最小值为0 否则最小值就是核心线程数
int min = allowCoreThreadTimeOut ? 0 : coreWorker;
//如果最小值为0,并且任务队列中还有任务存在,那么线程池数量保持为1
if(min == 0 && !taskQueue.isEmpty()){
min = 1;
}
//如果工作线程还大于最小值的话,直接返回不用创建
if(this.workerCount.get() >= min){
return;
}
}
addWorker(null,false);
}
}
/**
* 任务队列中获取任务
* @return
*/
private Runnable getTask() {
boolean isTimeout = false;
for (;;){
boolean status = this.isShutDown.get();
if(status){
System.out.println("线程池已经关闭,停止获取任务");
return null;
}
try {
//是否允许核心线程超时,工作线程是否大于了核心线程
boolean timed = allowCoreThreadTimeOut || this.workerCount.get() > coreWorker;
//工作线程进行收缩,如果允许核心线程超时,那么收缩时核心线程也会删除掉
if((this.workerCount.get() > maxWorker || (isTimeout && timed)) && (this.workerCount.get() > 1 || taskQueue.isEmpty())){
if(this.workerCount.compareAndSet(this.workerCount.get(),this.workerCount.get() - 1)){
System.out.println("线程池进行收缩,收缩后工作线程:"+ this.workerCount.get());
return null;
}
continue;
}
//take会阻塞并且释放cpu资源,poll则会根据等待的时间来一直阻塞但是不会释放cpu资源
Runnable r = timed ? taskQueue.poll(timeOut, TimeUnit.SECONDS) : taskQueue.take();
if(r != null){
return r;
}
isTimeout = true;
} catch (InterruptedException e) {
}
}
}
public void shutdownNow(){
while (!this.isShutDown.compareAndSet(this.isShutDown.get(),Boolean.TRUE)){
break;
}
}
private void doDecrementWorkerCount(){
do {}while (!this.workerCount.compareAndSet(workerCount.get(),workerCount.get() - 1));
}
private void doIncrementWorkerCount(){
do {}while (!this.workerCount.compareAndSet(workerCount.get(),workerCount.get() + 1));
}
private final class Worker extends ReentrantLock implements Runnable{
/**
* 第一次触发创建时的任务
*/
private Runnable firstTask;
/**
* 工作线程,worker运行的线程
*/
private Thread thread;
/**
* 已经处理了的任务
*/
private volatile long finishTaskNum;
public Worker(Runnable firstTask){
this.firstTask = firstTask;
//将worker本身包装进thread线程中
this.thread = new Thread(this,"zhj_"+ UUID.randomUUID().toString());
}
@Override
public void run() {
//调用Executors中定义的runWorker方法
runWorker(this);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Worker worker = (Worker) o;
return Objects.equals(firstTask, worker.firstTask) &&
Objects.equals(thread, worker.thread) &&
Objects.equals(finishTaskNum, worker.finishTaskNum);
}
@Override
public int hashCode() {
return Objects.hash(firstTask, thread, finishTaskNum);
}
}
}
拒绝策略,实现的比较简陋,大家可以根据线程池的自行实现。
public interface MyRejectHandler {
/**
* 拒绝策略
* @param runnable
* @param zThreadPoolExecutor
*/
void rejectHandler(Runnable runnable,ZThreadPoolExecutor zThreadPoolExecutor);
}
public class Rejected1Handler implements MyRejectHandler {
@Override
public void rejectHandler(Runnable runnable, ZThreadPoolExecutor zThreadPoolExecutor) {
System.out.println("丢弃任务....");
}
}
测试方法
public static void main( String[] args ) throws InterruptedException {
commitTask();
Thread.sleep(20000);
}
public static void commitTask(){
//阻塞数组队列
//ZThreadPoolExecutor zThreadPoolExecutor = new ZThreadPoolExecutor(2,3,1L,new ArrayBlockingQueue<>(5),new Rejected1Handler());
DelayQueue delayTasks = new DelayQueue<>();
ZThreadPoolExecutor zThreadPoolExecutor = new ZThreadPoolExecutor(2,3,1L, delayTasks,new Rejected1Handler());
for (int i = 0; i < 100; i++) {
zThreadPoolExecutor.execute(new DelayCreator.DelayTask("任务"+i, Long.parseLong(i+"000")));
if(i == 20){
//关闭线程
zThreadPoolExecutor.shutdownNow();
}
}
}
public class ZTask implements Runnable {
private String name;
public ZTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("自定义线程池提交。。。"+name);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
版权声明:本文为weixin_43915643原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。