Java自定义线程池详解及代码实现(非直接调用ThreadPoolExecutor)

  • Post author:
  • Post category:java


要实现自定义的线程池,首先得了解线程池的工作流程。

我们可以参考JDK中自定的线程池工作流程去理解,并实现其简化版本。



JDK中的线程池函数ThreadPoolExecutor

JDK中实现线程池的函数如下,其中包含了7个参数。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

这7个参数的含义如下:

在这里插入图片描述


  • corePoolSize

    表示线程池中的核心线程数,核心线程为常驻线程池中的线程的个数。

  • maximumPoolSize

    表示最大线程数,其中救急线程个数等于

    maximumPoolSize - corePoolSize

    ,救急线程为线程池核心线程已满,并且阻塞队列已满时使用的线程,救急线程并不会常驻线程池,其有空闲存活时间,可以通过之后的参数设定。

  • keepAliveTime

    表示救急线程空闲存活时间,当救急线程没有任务时,等待

    keepAliveTime

    之后还没有任务,则消亡。

  • unit

    的时间单位。

  • workQueue

    阻塞队列,当线程池中的核心线程都在工作时,之后来的任务会让在队列中等待执行。

  • threadFactory

    创建线程的工厂,一般用来为线程设定可辨识的名字。

  • handler

    拒绝策略,当线程池无法执行之后来的任务的处理策略。一般包括 1)

    AbortPolicy

    让调用者抛出

    RejectedExecutionException

    异常,这是默认策略 2)

    CallerRunsPolicy

    让调用者运行任务 3)

    DiscardPolicy

    放弃本次任务 4)

    DiscardOldestPolicy

    放弃队列中最早的任务,本任务取而代之。我们也可以实现自己拒绝策略: 1) 死等 2) 带超时等待 3) 让调用者放弃任务执行 4) 让调用者抛出异常 5) 让调用者自己执行任务等。



JDK中的线程池执行任务时的流程

当我们调用JDK中的线程池执行任务时,其流程一般如下:

  1. 当我们想向线程池提交任务时,如果线程池的线程数中小于核心线程数时,线程池新建核心线程,任务交给核心线程执行。
  2. 如果线程池中线程数等于核心线程数,则将后来的任务放入到阻塞队列中,等待核心线程执行完任务之后,从队列中取任务执行。
  3. 当核心线程都在工作,并且阻塞队列已满时,则创建救急线程,可创建的救急线程数为

    maximumPoolSize - corePoolSize

    ,将任务交给救急线程执行。
  4. 当核心线程已满、阻塞队列已满,救急线程已满时,则执行根据所提供的拒绝策略对后来的任务进行处理。



自定义线程池业务分析

我们可以对上述流程进行分析,简化处理去除掉救急线程,来完成自定义的线程池。


  1. 定义阻塞队列

    。通过对线程池的业务分析,我们可以发现,当线程池中的线程都在工作时,后续的任务需要放置到阻塞队列中。之后线程池中的线程都是从阻塞队列中获取任务执行,新来的任务也是被放置到阻塞队列中。

    此处的工作方式,为生产者消费者模式

    。因为阻塞队列为多线程中的共享资源,所以需要加锁以确保共享资源的安全性。

  2. 阻塞队列的实现分析

    。首先需要定义队列容量,定义双端队列来存储任务,此处任务类型可定义为泛型。之后需要定义两种方法,分别是

    任务存入队列的方法和从队列中获取任务的方法

    。在存和取得过程中使用可重入锁进行加锁判断。在取任务的过程中,加锁,如果过队列为空,则阻塞等待,也可以设置为带超时的阻塞等待,后续会附上代码,否则直接从队列中返回任务对象。在存任务的过程中,加锁,如果队列为满,则阻塞等待,也可以设置为带超时的阻塞等待,如果不为空,则存入。具体见之后的代码实现。

  3. 定义线程池

    。首先需要设定核心线程数的大小,定义集合对象存储已经创建的线程,方便之后根据集合对象获取已创建的线程的个数。当新来任务时,如果集合的大小小于核心线程数则新建线程,执行任务。否则当核心线程已经全部工作时,需要将新来的任务放入阻塞队列中,等待线程执行完毕从阻塞队列中获取任务,相当于生产者与消费者模式中的消费者。

  4. 定义线程对象

    。该线程对象继承自

    Thread

    类,并重写其

    run()

    方法。该线程对象即为线程池中的线程。我们在其

    run()

    方法中,编写线程执行任务的过程,任务我们定义为

    Runnable

    对象,通过调用在线程对象中

    run()

    方法中调用

    Runnable

    对象的

    run

    方法,执行任务,当执行完毕时候,该线程对象继续尝试从阻塞队列中获取任务,可以阻塞获取(即一直等待,有任务就执行,没任务就等待),或者超时等待(当超时之后,直接放弃获取)。

  5. 定义拒绝策略

    。拒绝策略通过使用策略模型实现,我们只定义拒绝策略的接口,具体逻辑通过调用者实现。拒绝策略的使用情况。当阻塞队列已满时,如果还有新来的任务,则使用拒绝策略进行处理。当队列未满则直接添加。

以上就是自定义线程池所需要的对象方法,接下来我们使用Java代码一一实现。



自定义线程池的代码实现-注释详尽

我们将任务定义为

Runnable

对象,线程池中的线程对象为

Thread

的子类,这样就可以将

Runnable

对象,传给

Thread

进行处理。当然其他实现也可以,可以自行尝试。



1.定义阻塞队列

class BlockingQueue<T>{
    // 1.任务队列, 双向队列
    private Deque<T> queue = new ArrayDeque<>();

    // 2.锁
    private ReentrantLock lock = new ReentrantLock();

    // 3.生产者条件变量
    private Condition fullWaitSet = lock.newCondition();

    // 4.消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    // 5.容量
    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    // 超时阻塞获取任务
    public T pull(long timeout, TimeUnit unit){
        lock.lock();
        try{
            // 将超时时间统一转换为纳秒
            long nanos = unit.toNanos(timeout);
            // 取任务的时候,如果为空则需要等待
            while(queue.isEmpty()){
                // 超时的情况直接返回null
                if(nanos <= 0){
                    return null;
                }
                // 返回的是剩余的时间
                nanos = emptyWaitSet.awaitNanos(nanos);
            }
            T t = queue.removeFirst();
            // 唤醒放入的线程
            fullWaitSet.signal();
            return t;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取任务
    public T take(){
        // 加锁
        lock.lock();
        try{
            // 取任务的时候,如果为空则需要等待
            while(queue.isEmpty()){
                emptyWaitSet.await();
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间的阻塞添加任务
    public boolean offer(T task, long timeout, TimeUnit timeUnit){
        lock.lock();
        try{
            long nanos = timeUnit.toNanos(timeout);
            // 添加任务时,如果队列已满则需要等待
            while(queue.size()==capacity ){
                System.out.println(task.toString() + " 等待加入任务队列" );
                if(nanos<=0){
                    return false;
                }
                nanos = fullWaitSet.awaitNanos(nanos);
            }
            queue.addLast(task);
            System.out.println("任务【" + task.toString() +  "】加入队列 " );
            emptyWaitSet.signal();
            return true;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    // 阻塞添加任务
    public void put(T task){
        lock.lock();
        try{
            while(queue.size()==capacity){
                System.out.println(task.toString() + " 等待加入任务队列" );
                fullWaitSet.await();
            }
            queue.addLast(task);
            System.out.println("任务【" + task.toString() +  "】加入队列 " );
            emptyWaitSet.signal();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    // 获取队列大小
    public int size(){
       lock.lock();
       try{
           return queue.size();
       }finally {
           lock.unlock();
       }
    }
    
    // 为使用拒绝策略所添加的向队列中添加任务的方法
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try{
            // 队列已满
            if(queue.size()==capacity){
                rejectPolicy.reject(this,task);
            }else{ // 有空闲
                queue.addLast(task);
                System.out.println("任务【" + task.toString() +  "】加入队列 " );
                emptyWaitSet.signal();
            }

        }finally {
            lock.unlock();
        }
    }
}



2.定义线程池以及线程对象内部类

class ThreadPool{
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();

    // 核心线程数
    private int coreSize;

    // 获取任务的超时时间,时间单位,当从队列中获取超时时,放弃获取
    private long timeout;
    private TimeUnit timeUnit;

    // 拒绝策略
    private RejectPolicy<Runnable> rejectPolicy;

    // 线程池传入任务的方法
    public void execute(Runnable task){
        // 当任务数没有超过coreSize,直接交给worker对象执行
        // 如果任务数超过coreSize时,加入任务队列
        // 因为集合workers为共享变量,所以此处也需要加锁
        synchronized (workers){
            if(workers.size() < coreSize){
                Worker worker = new Worker(task);
                System.out.println("新增worker " + worker.toString() + " 任务 " + task.toString());
                workers.add(worker);
                worker.start();
            }else{
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    class Worker extends Thread{
        private Runnable task;

        private Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1.当task不为空,则执行任务
            // 2.当task执行完毕,接着去任务队列中获取并执行
            // 此处使用了短路逻辑
            while(task !=null || (task = taskQueue.pull(timeout, timeUnit)) !=null ){
                try{
                    System.out.println("正在执行: " + task.toString());
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
            }
            // 超时获取时,如果未获取到任务,则结束该线程
            synchronized (workers){
                System.out.println("worker 移除:" + this.toString());
                workers.remove(this);
            }
        }
    }
}



3.定义拒绝策略,只定义接口,之后策略由调用者传入。

interface RejectPolicy<T>{
    void reject(BlockingQueue<T> queue, T task);
}



自定义线程池代码测试

public class MyThreadPoolTest {
    public static void main(String[] args) {
         // 定义线程池,传入参数为线程数,超时时间(当获取任务时间超过改时间时,结果等待)
         // 时间单位, 队列容量,拒绝策略,此处出lambda表达式,因为我们实现的拒绝策略只有一个接口,所以可以这样写
         ThreadPool pool = new ThreadPool(1, 1000,
                 TimeUnit.MICROSECONDS, 1,(queue,task)->{
                 // 1.死等
                 // queue.put(task);
                 // 2.带超时的等待
                 // queue.offer(task, 1500, TimeUnit.MILLISECONDS);
                 // 3.让调用者放弃任务执行
                 // System.out.println("队列已满放弃等待");
                 // 4.抛出异常
                 throw new RuntimeException("任务执行失败,队列已满" + task);
                 // 5.自己执行
                 // task.run();
         });
        
         // 给线程池提交任务,循环3次,任务为打印,执行每次失眠一秒
        for (int i = 0; i < 3; i++) {
            int id = i+1;
            pool.execute(()->{
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().toString()+ " " + id);
            });
        }
    }
}


测试结果;


在这里插入图片描述


结果分析;

我们创建了线程个数为1的线程池,并且阻塞队列也为1,拒绝策略为直接抛出异常。当有三个任务时,我们可以看到刚开始第一个任务

JUC.MyThreadPoolTest$$Lambda$2/2074407503@4dd8dc3

到来,线程池创建了线程对象,第二个任务

JUC.MyThreadPoolTest$$Lambda$2/2074407503@568db2f2

加入的阻塞队列,第一个任务执行。当第三个任务来时,因为线程池中线程正忙,阻塞队列已满,所以根据拒绝策略直接抛出了异常。当两个任务执行完毕之后,线程池中的线程尝试从阻塞队列中继续超时获取,但是超时之后未获取到,所以直接结束,并删除了线程池中的线程,任务结束。

其他的情况,可以自行尝试。



版权声明:本文为qq_36944952原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。