手写线程池

  • Post author:
  • Post category:其他


需要了解线程池基础的移步:

线程池总结

手写的线程池比较简陋,只有运行状态以及停止状态,采用内部有些属性采用了Atomi包里面的类来实现,更加方便。

方法说明:

  • execute:提交任务方法,先判断核心线程是否达到配置的,如果达到配置的核心线程池配置,那么将任务提交到阻塞队列中,如果阻塞队列已经满了就创建工作线程,工作线程达到了最大线程数那么采用配置的策略进行处理。
  • addWorker:在创建前先对线程池状态以及工作线程线程数进行判断;将任务封装成Worker类,Worker内部类,对任务进行封装,并且创建一个线程
  • Worker类:继承了ReentrantLock 和 Runable,run方法去执行了线程池中定义的runWorker()方法,将自己传入。
  • getTask:当前方法会去循环的取出队列中的任务,如果取的任务为空,就直接跳出while循环
package com.zhj.executors.myexecutor;

import com.zhj.ConcurrentApplication;
import javafx.concurrent.Worker;
import org.springframework.util.Assert;

import java.util.HashSet;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
 *           ┏┓    ┏┓+ +
 *        ┏┛┻━━━┛┻┓ + +
 *        ┃       ┃
 *        ┃   ━   ┃ ++ + + +
 *        ████━████ ┃+
 *        ┃       ┃ +
 *        ┃   ┻   ┃
 *        ┃       ┃ + +
 *        ┗━┓   ┏━┛
 *          ┃   ┃
 *          ┃   ┃ + + + +
 *          ┃   ┃			God beast body, code no BUG
 *          ┃   ┃ +			神兽护体,代码无BUG
 *          ┃   ┃
 *          ┃   ┃  +
 *          ┃    ┗━━━┓ + +
 *          ┃        ┣┓
 *          ┃        ┏┛
 *          ┗┓┓┏━┳┓┏┛ + + + +
 *           ┃┫┫ ┃┫┫
 *           ┗┻┛ ┗┻┛+ + + +
 *
 * @title: 自定义线程池
 * @author zhonghaijun
 * @date 2021-03-02
 */
public class ZThreadPoolExecutor {

    /**
     * 核心线程数
     */
    private Integer coreWorker;

    /**
     * 最大线程数
     */
    private Integer maxWorker;


    /**
     * 工作线程数
     */
    private AtomicInteger workerCount;

    /**
     * 排它锁
     */
    private ReentrantLock mainLock = new ReentrantLock();

    /**
     * 线程池状态
     */
    private AtomicBoolean isShutDown = new AtomicBoolean(false);


    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> taskQueue;

    /**
     * 线程池
     */
    private HashSet<Worker> workSet;

    /**
     * 计算线程完成的数
     */
    private long completedTaskCount;

    /**
     * 是否允许核心线程超时,如果为true,那么线程池最后会收缩至0
     */
    private boolean allowCoreThreadTimeOut = false;

    /**
     * 设置超时时间
     */
    private long timeOut;


    /**
     * 线程池拒绝策略
     */
    private MyRejectHandler handler;


    public ZThreadPoolExecutor(Integer coreWorker,Integer maxWorker,long timeOut, BlockingQueue<Runnable> taskQueue, MyRejectHandler handler) {
        this.coreWorker = coreWorker;
        this.maxWorker = maxWorker;
        this.workerCount = new AtomicInteger(0);
        this.timeOut = timeOut;
        this.taskQueue = taskQueue;
        this.workSet = new HashSet<>();
        this.handler = handler;
    }

    /**
     * 提交线程任务
     * @param runnable
     */
    public void execute(Runnable runnable){
        Assert.notNull(runnable,"任务不能为空");
        //比较核心线程是否创建完毕
        int workerCount = this.workerCount.get();
        if(workerCount < coreWorker){
            //创建核心线程
            addWorker(runnable,true);
            return;
        }
        boolean status = this.isShutDown.get();
        //状态为运行中,并且添加到等待队列成功
        if(!status && this.taskQueue.offer(runnable)){
            System.out.println("任务入队。。");
            status = this.isShutDown.get();
            //如果状态是已经关闭,移除任务
            if(status && this.taskQueue.remove(runnable)){
                //调用拒绝策略
                handler.rejectHandler(runnable,this);
            }
            //如果获取到的工作线程数为0,那么就创建一个默认的
            workerCount = this.workerCount.get();
            if(workerCount == 0){
                addWorker(null,false);
            }
        }else if(!addWorker(runnable,false)){
            //添加失败调用拒绝策略
            handler.rejectHandler(runnable,this);
        }

    }

    private Boolean addWorker(Runnable firstTask,Boolean core){
        //retry 使用break retry跳过当前标记下继续循环,continue retry表示跳到当前标记下继续循环
        retry:
        for (;;){
            boolean status = isShutDown.get();
            if(status){
                //判断运行状态
                System.out.println("线程池已经关闭,提交任务失败。。");
                return false;
            }
            for (;;){
                int workerCount = this.workerCount.get();
                //每次添加worker都要判断工作线程数是否要大于核心线程数或者最大线程数
                //如果大于核心线程数,返回false,重新创建普通线程;如果大于最大线程数,返回false调用拒绝策略
                if(workerCount >= (core ? coreWorker:maxWorker)){
                    return false;
                }
                //比较交换,将工作线程数加一
                if(this.workerCount.compareAndSet(workerCount,workerCount+1)){
                    System.out.println("工作线程数:"+this.workerCount.get());
                    break retry;
                }
                if(isShutDown.get() != status){
                    continue retry;
                }
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker worker = null;
        try {
            worker = new Worker(firstTask);
            //获取到工作线程本身
            Thread thread = worker.thread;
            if(thread != null){
                try {
                    //将创建的Worker添加到工作队列中,涉及到对数组的操作,需要加锁
                    this.mainLock.lock();
                    boolean status = this.isShutDown.get();
                    //判断状态是否已经关闭
                    if(!status){
                        //判断当前线程是否已经启动,如果启动了就抛出异常
                        if(thread.isAlive()){
                            throw new IllegalAccessException(thread.getName()+"线程已经启动,不能再次启动");
                        }
                        //添加到工作队列中
                        workSet.add(worker);
                        workerAdded = true;
                    }
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } finally {
                    this.mainLock.unlock();
                }
                //如果被添加成功了就启动线程
                if(workerAdded){
                    //启动工作线程
                    System.out.println("启动工作线程。。");
                    thread.start();
                    workerStarted = true;
                }
            }
        } finally {
            if(!workerAdded){
                System.out.println("工作线程添加失败");
                addWorkerFailed(worker);
            }
        }
        return workerStarted;
    }

    /**
     * 添加工作队列失败,删除掉,并且将工作线程数减一
     * @param worker
     */
    private void addWorkerFailed(Worker worker){
        try {
            this.mainLock.lock();
            if(worker != null){
                this.workSet.remove(worker);
                //执行比较交换,减去工作线程
                doDecrementWorkerCount();
            }
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 真正worker执行的方法
     * @param worker
     */
    private void runWorker(Worker worker) {
        Thread thread = Thread.currentThread();
        //获取到第一次的任务
        Runnable firstTask = worker.firstTask;
        //清空
        worker.firstTask = null;
        boolean completedAbruptly = true;
        try {
            //先判断第一次触发的任务是否为空,如果为空则去队列中获取任务,当前是while循环会一直去任务队列中获取数据
            while (firstTask != null || (firstTask = getTask()) != null){
                worker.lock();
                boolean status = this.isShutDown.get();
                //判断线程池是否已经关闭,并且线程是否已经中断,如果未中断就调用interrupt设置中断标记
                //isInterrupted()判断当前线程是否被标记为中断,但是不会清楚标记;interrupted()判断是否标记为中断,并且会清除标记
                if(status && !thread.isInterrupted()){
                    System.out.println("线程池获取失败,中断任务。。");
                    thread.interrupt();
                }
                try {
                    //直接调用task任务的run方法执行
                    firstTask.run();
                } catch (Exception e){
                    throw new RuntimeException(e);
                } finally {
                    //清空工作线程中的任务
                    firstTask = null;
                    worker.finishTaskNum ++;
                    worker.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(worker,completedAbruptly);
        }
    }

    private void processWorkerExit(Worker worker, Boolean completedAbruptly) {
        //如果任务不是正常的退出,减去工作线程
        if(completedAbruptly){
            doDecrementWorkerCount();
        }
        this.mainLock.lock();
        try {
            //线程池记录总共完成的任务数量
            completedTaskCount += worker.finishTaskNum;
            //移除掉worker
            workSet.remove(worker);
        } finally {
            this.mainLock.unlock();
        }
        //如果线程池没有关闭,并且队列不为空时,创建新的工作线程
        boolean status = this.isShutDown.get();
        if(status){
            System.out.println("线程池已经关闭,停止创建工作线程,工作线程数:"+this.workerCount.get());
        }
        if(!status){
            if(!completedAbruptly){
                //如果允许核心线程超时,那么线程池存在的线程最小值为0 否则最小值就是核心线程数
                int min = allowCoreThreadTimeOut ? 0 : coreWorker;
                //如果最小值为0,并且任务队列中还有任务存在,那么线程池数量保持为1
                if(min == 0 && !taskQueue.isEmpty()){
                    min = 1;
                }
                //如果工作线程还大于最小值的话,直接返回不用创建
                if(this.workerCount.get() >= min){
                    return;
                }
            }
            addWorker(null,false);
        }
    }

    /**
     * 任务队列中获取任务
     * @return
     */
    private Runnable getTask() {
        boolean isTimeout = false;
        for (;;){
            boolean status = this.isShutDown.get();
            if(status){
                System.out.println("线程池已经关闭,停止获取任务");
                return null;
            }
            try {
                //是否允许核心线程超时,工作线程是否大于了核心线程
                boolean timed = allowCoreThreadTimeOut || this.workerCount.get() > coreWorker;

                //工作线程进行收缩,如果允许核心线程超时,那么收缩时核心线程也会删除掉
                if((this.workerCount.get() > maxWorker || (isTimeout && timed)) && (this.workerCount.get() > 1 || taskQueue.isEmpty())){
                    if(this.workerCount.compareAndSet(this.workerCount.get(),this.workerCount.get() - 1)){
                        System.out.println("线程池进行收缩,收缩后工作线程:"+ this.workerCount.get());
                        return null;
                    }
                    continue;
                }
                //take会阻塞并且释放cpu资源,poll则会根据等待的时间来一直阻塞但是不会释放cpu资源
                Runnable r = timed ? taskQueue.poll(timeOut, TimeUnit.SECONDS) : taskQueue.take();
                if(r != null){
                    return r;
                }
                isTimeout = true;
            } catch (InterruptedException e) {

            }
        }
    }

    public void shutdownNow(){
        while (!this.isShutDown.compareAndSet(this.isShutDown.get(),Boolean.TRUE)){
            break;
        }
    }

    private void doDecrementWorkerCount(){
        do {}while (!this.workerCount.compareAndSet(workerCount.get(),workerCount.get() - 1));
    }

    private void doIncrementWorkerCount(){
        do {}while (!this.workerCount.compareAndSet(workerCount.get(),workerCount.get() + 1));
    }



    private final class Worker extends ReentrantLock implements Runnable{

        /**
         * 第一次触发创建时的任务
         */
        private Runnable firstTask;

        /**
         * 工作线程,worker运行的线程
         */
        private Thread thread;

        /**
         * 已经处理了的任务
         */
        private volatile long finishTaskNum;

        public Worker(Runnable firstTask){
            this.firstTask = firstTask;
            //将worker本身包装进thread线程中
            this.thread = new Thread(this,"zhj_"+ UUID.randomUUID().toString());
        }


        @Override
        public void run() {
            //调用Executors中定义的runWorker方法
            runWorker(this);
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            Worker worker = (Worker) o;
            return Objects.equals(firstTask, worker.firstTask) &&
                    Objects.equals(thread, worker.thread) &&
                    Objects.equals(finishTaskNum, worker.finishTaskNum);
        }

        @Override
        public int hashCode() {
            return Objects.hash(firstTask, thread, finishTaskNum);
        }
    }
}

拒绝策略,实现的比较简陋,大家可以根据线程池的自行实现。


public interface MyRejectHandler {

    /**
     * 拒绝策略
     * @param runnable
     * @param zThreadPoolExecutor
     */
    void rejectHandler(Runnable runnable,ZThreadPoolExecutor zThreadPoolExecutor);
}

public class Rejected1Handler implements MyRejectHandler {

    @Override
    public void rejectHandler(Runnable runnable, ZThreadPoolExecutor zThreadPoolExecutor) {
        System.out.println("丢弃任务....");
    }
}

测试方法

public static void main( String[] args ) throws InterruptedException {
        commitTask();
        Thread.sleep(20000);
}

public static void commitTask(){
        //阻塞数组队列
        //ZThreadPoolExecutor zThreadPoolExecutor = new ZThreadPoolExecutor(2,3,1L,new ArrayBlockingQueue<>(5),new Rejected1Handler());
        DelayQueue delayTasks = new DelayQueue<>();
        ZThreadPoolExecutor zThreadPoolExecutor = new ZThreadPoolExecutor(2,3,1L, delayTasks,new Rejected1Handler());
        for (int i = 0; i < 100; i++) {
            zThreadPoolExecutor.execute(new DelayCreator.DelayTask("任务"+i, Long.parseLong(i+"000")));
            if(i == 20){
                //关闭线程
                zThreadPoolExecutor.shutdownNow();
            }
        }
    }
public  class ZTask implements Runnable {

        private String name;

        public ZTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("自定义线程池提交。。。"+name);
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }



版权声明:本文为weixin_43915643原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。