六、03【Java 多线程】之Java线程

  • Post author:
  • Post category:java


Java 创建线程的方式

Java创建线程有四种方式:

  • 继承 Thread 类
  • 实现 Runnable 接口
  • 实现 Callable 接口
  • 使用 Executors 工具类创建线程池


1)继承 Thread 类

创建一个类继承 Thread,重新run()方法。run() 方法就是线程要执行的业务逻辑方法。

将该类进行实例化,调用 star() 方法来启动线程。

public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " run()方法正在执行...");
    }
}

public class ThreadTest {

    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start();
        System.out.println(Thread.currentThread().getName() + " main()方法执行结束");
    }
}

执行结果:
main main()方法执行结束
Thread-0 run()方法正在执行...


2)实现 Runnable 接口

创建一个类实现 Runnable 接口,并重写run()方法。

将该类的实例作为 target 创建Thead对象,该Thread对象才是真正的线程对象。

调用 star() 方法来启动线程。

public class MyRunnable implements Runnable{
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " run()方法正在执行...");
    }
}

public class ThreadTest {
    public static void main(String[] args) {
        MyRunnable myRunnable = new MyRunnable();
        Thread thread = new Thread(myRunnable);
        thread.start();
        System.out.println(Thread.currentThread().getName() + " main()方法执行结束");
    }
}

执行结果:
main main()方法执行结束
Thread-0 run()方法正在执行...


3)实现 Callable 接口

创建实现Callable接口的类MyCallable,以myCallable为参数创建FutureTask对象,

将FutureTask作为参数创建Thread对象,调用线程对象的 start() 方法。

public class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() {
        System.out.println(Thread.currentThread().getName() + " call()方法执行中...");
        return 1;
    }
}

public class ThreadTest {
    public static void main(String[] args) {
        FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
        Thread thread = new Thread(futureTask);
        thread.start();
        try {
            Thread.sleep(1000);
            System.out.println("返回结果 " + futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " main()方法执行完成");
    }
}

执行结果:
Thread-0 call()方法执行中...
返回结果 1
main main()方法执行完成


4)使用 Executors 工具类创建线程池

Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

主要有newFixedThreadPool,newCachedThreadPool,newSingleThreadExecutor,newScheduledThreadPool 四种

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " run()方法执行中...");
    }
}

public static void main(String[] args) {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    MyRunnable myRunnable = new MyRunnable();
    for (int i = 0; i < 5; i++) {
        executorService.execute(myRunnable);
    }
    System.out.println("线程任务开始执行...");
    executorService.shutdown();
}

执行结果:
线程任务开始执行...
pool-1-thread-1 run()方法正在执行...
pool-1-thread-1 run()方法正在执行...
pool-1-thread-1 run()方法正在执行...
pool-1-thread-1 run()方法正在执行...
pool-1-thread-1 run()方法正在执行...


public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    MyRunnable myRunnable = new MyRunnable();
    for (int i = 0; i < 5; i++) {
        executorService.execute(myRunnable);
    }
    System.out.println("线程任务开始执行...");
    executorService.shutdown();
}

执行结果:
线程任务开始执行...
pool-1-thread-1 run()方法正在执行...
pool-1-thread-3 run()方法正在执行...
pool-1-thread-2 run()方法正在执行...
pool-1-thread-4 run()方法正在执行...
pool-1-thread-5 run()方法正在执行...

上面实现多线程的几种方式各有优点:使用继承的优点就是方便传参,可以在子类添加成员变量,通过set方法或构造函数进行设置参数,如果使用Runable的方式,则只能使用主线程里面被声明为final的变量。使用继承的缺点就是,如果继承了Thread类,那么就不能继承其他类了,因为Java里面不支持多继承,而Runable则不会。这两种方式都没办法拿到返回结果,但是实现Callable使用FutureTask则可以拿到返回结果。

Thread 类

Thread 是程序中执行的线程。 Java虚拟机允许应用程序具有多个并发运行的执行线程。

每个线程都有优先级。优先级较高的线程优先于优先级较低的线程执行。每个线程也可以标记为守护进程,也可以不标记为守护进程。当在某个线程中运行的代码创建新的线程对象时,新线程的优先级最初设置为与创建线程的优先级相等,并且仅当创建线程是守护进程时,该线程才是守护进程线程。

当Java虚拟机启动时,通常只有一个非守护进程线程(它通常调用某个指定类的main方法)。Java虚拟机将继续执行线程,直到发生以下任一情况:

1)类运行时的exit方法已被调用,并且安全管理器已允许执行exit操作。

2)所有不是守护进程线程的线程都已死亡,要么是通过调用run方法返回,要么是通过抛出传播到run方法之外的异常。

每个线程都有一个名称用于标识。多个线程可以有相同的名称。如果在创建线程时没有指定名称,则会为其生成一个新名称。

除非另有说明,若将null参数传递给此类中的构造函数或方法将导致引发NullPointerException。

Java 线程依赖于此类。


属性&构造函数

// 线程是否为守护线程。
private boolean daemon = false;
// jvm 状态
private boolean stillborn = false;
// 线程将会运行什么
private Runnable target;
// 这个线程组
private ThreadGroup group;
// 线程的ID
private long tid;
// 线程状态,初始化以表示线程“尚未启动”
private volatile int threadStatus = 0;
// ....
// 线程可以拥有的最小优先级。
public final static int MIN_PRIORITY = 1;
// 分配给线程的默认优先级。
public final static int NORM_PRIORITY = 5;
// 线程可以拥有的最大优先级。
public final static int MAX_PRIORITY = 10;

// 构造函数
// 创建一个新的线程对象。此构造函数与Thread(ThreadGroup,Runnable,String)具有相同的效果。全部使用默认。
public Thread() {}
// 创建一个新的线程对象。指定线程运行目标,与Thread(ThreadGroup,Runnable,String)具有相同的效果。默认线程名:“Thread-”+n,n为整数。
public Thread(Runnable target){}
// 创建一个新的线程对象。指定线程组和线程运行目标,与Thread(ThreadGroup,Runnable,String)具有相同的效果。使用默认线程名。
public Thread(ThreadGroup group,Runnable target){}
// 创建一个新的线程对象。并指定这个线程的名称,与Thread(ThreadGroup,Runnable,String)相同的效果。
public Thread(String name){}
// 创建一个新的线程对象。指定这个线程的线程组和线程名称,这个构造函数具有与Thread(ThreadGroup,Runnable,String)相同的效果。
public Thread(ThreadGroup group,String name){}
// 创建一个新的线程对象。指定线程运行目标和线程名。
public Thread(Runnable target,String name){}
// 创建一个新的线程对象。指定线程组、线程运行目标和线程名。如果有安全管理器,则使用ThreadGroup作为参数调用它的checkAccess方法。
// 此外,当重写getContextClassLoader或setContextClassLoader方法的子类的构造函数直接或间接调用其checkPermission方法时,
// 会使用RuntimePermission("enableContextClassLoaderOverride")权限调用其checkPermission方法。
// 新创建线程的优先级与创建它的线程(即当前运行的线程)的优先级相等。setPriority方法可以用来将优先级更改为一个新值。
// 当仅创建线程的线程当前被标记为守护线程时,新创建的线程最初被标记为守护线程。setDaemon方法可以用来改变一个线程是否是一个守护进程。
public Thread(ThreadGroup group,Runnable target,String name){}
// 这个构造函数和Thread(ThreadGroup,Runnable,String)是一样的,只是它允许指定线程堆栈大小。
// 堆栈大小是虚拟机为这个线程的堆栈分配的地址空间的大约字节数。stackSize参数的影响(如果有的话)高度依赖于平台。
// 在某些平台上,为stackSize参数指定更高的值可能会允许线程在抛出StackOverflower错误之前获得更大的递归深度。
// 类似地,指定一个较低的值可能会允许更多线程同时存在,而不会抛出OutOfMemoryError(或其他内部错误)。
// stackSize参数的值与最大递归深度和并发级别之间的关系的细节取决于平台。在某些平台上,stackSize参数的值可能没有任何影响。
// 由于此构造函数的行为依赖于平台,因此在使用时应格外小心。执行给定计算所需的线程堆栈大小可能因JRE实现的不同而不同。
// 根据这种变化,可能需要仔细地调优堆栈大小参数,并且可能需要对要运行应用程序的每个JRE实现重复调优。
public Thread(ThreadGroup group,Runnable target,String name,long stackSize){}


常用方法

// 返回当前执行的线程对象的引用
public static Thread currentThread(){}

// 给调度程序的一个提示,表明当前线程愿意放弃当前对处理器的使用(让出CPU执行权)。调度程序可以随意忽略此提示。
// Yield是一种启发式尝试,旨在改善线程之间的相对进程,否则会过度使用CPU。
// 它的使用应该与详细的分析和基准测试相结合,以确保它实际上具有预期的效果。
// 使用这种方法很少是合适的。它可能用于调试或测试目的,在这些目的中,
// 它可能有助于由于竞争条件而重新生成错误。在设计并发控制构造(如java.util.concurrent.locks包中的构造)时,它可能也很有用。
// 当一个线程调用这个方法的时候,当前线程让出CPU执行权,处于就绪状态,线程调度器从线程就绪队列里获取一个线程优先级最高的线程,也有可能
// 是刚刚让出CPU执行权的这个线程来获取CPU执行权。
public static void yield(){}

// 根据系统计时器和调度器的精度和准确性,使当前执行的线程在指定的毫秒数内处于睡眠状态(暂时停止执行)。线程不会失去任何监视器的所有权。
// millis 睡眠时间以毫秒为单位;nanos 0~999999 额外的纳秒睡眠时间
public static void sleep(long millis) throws InterruptedException{}
public static void sleep(long millis, int nanos) throws InterruptedException{}

// 使该线程开始执行;Java虚拟机调用该线程的run方法。
// 两个线程同时运行:当前线程(它从对start方法的调用中返回)和另一个线程(它执行它的run方法)。
// 多次启动一个线程是不合法的。特别是线程在完成执行后可能不会重新启动。
public void start(){}

// 如果这个线程是使用一个单独的Runnable运行对象构造的,那么这个Runnable对象的run方法就会被调用; 否则,此方法不执行任何操作并返回。
public void run(){}

// 中断线程
// 除非当前线程正在中断自身(这是始终允许的),否则会调用此线程的checkAccess方法,这可能会导致抛出SecurityException。
// 如果该线程在调用对象类的wait() wait(long)或wait(long,int)方法或该类的join()、join(long)、join(long,int)、sleep(long)
// 或sleep(long,int)方法时被阻塞,那么它的中断状态将被清除,并接收到InterruptedException。
// 如果这个线程在一个InterruptibleChannel的I/O操作中被阻塞,那么该通道将被关闭,线程的中断状态将被设置,并且该线程将收到一个ClosedByInterruptException异常。
// 如果这个线程在Selector中被阻塞,那么线程的中断状态将被设置,并且它将立即从选择操作中返回,可能带有一个非零值,就像调用了选择器的唤醒方法一样。
// 如果前面的条件都不满足,那么这个线程的中断状态将被设置。中断一个非活动的线程不需要有任何影响。
public void interrupt(){}

// 测试当前线程是否已中断。此方法清除线程的中断状态。
// 换句话说,如果这个方法被连续调用两次,第二次调用将返回false(除非当前线程在第一次调用清除其中断状态之后,第二次调用检查之前再次中断)。
public static boolean interrupted(){}

// 测试此线程是否已被中断。线程的中断状态不受此方法的影响。
// 由于线程在中断时不是活动的而忽略的线程中断将由这个返回false的方法反映出来。
public boolean isInterrupted(){}

// 测试此线程是否处于活动状态。如果一个线程已经启动并且还没有死亡,那么它就是活的。
public final boolean isAlive(){}

// 更改线程的优先级。首先,调用该线程的checkAccess方法时不带参数。这可能会导致抛出SecurityException。
// 否则,该线程的优先级将设置为指定的newPriority和该线程线程组的最大允许优先级中的较小者。
public final void setPriority(int newPriority){}
// 获取线程优先级
public final int getPriority(){}

// 更该线程名。同样会不带参数调用该线程的checkAccess方法。这可能导致抛出一个SecurityException。
public final void setName(String name){}
// 获取线程名
public final String getName(){}

// 返回该线程所属的线程组。如果该线程已经死亡(已停止),则该方法返回null。
public final ThreadGroup getThreadGroup(){}

// 返回当前线程的线程组及其子组中的活动线程数的估计值。递归地遍历当前线程的线程组中的所有子组。
// 返回的值只是一个估计值,因为在此方法遍历内部数据结构时,线程的数量可能会动态变化,可能会受到某些系统线程的影响。这种方法主要用于调试和监视。
public static int activeCount(){}

// 等待子线程执行完,在继续执行。
public final void join() throws InterruptedException{}
// 等待该线程死亡的时间最多为毫秒及纳秒。为0意味着永远等待。
public final void join(long millis) throws InterruptedException{}
public final void join(long millis,int nanos) throws InterruptedException{}

// 将该线程标记为守护线程或用户线程。当运行的线程都是守护线程时,Java虚拟机将退出。必须在线程启动之前调用此方法。
public final void setDaemon(boolean on){}
// 测试该线程是否是守护线程。
public final boolean isDaemon(){}

// 确定当前运行的线程是否有修改此线程的权限。
// 如果有一个安全管理器,它的checkAccess方法将以这个线程作为它的参数来调用。这可能导致抛出一个SecurityException。
public final void checkAccess(){}

// 返回该线程的字符串表示形式,包括线程的名称、优先级和线程组。
public String toString(){}
// 返回该线程的标识符。线程ID是创建该线程时生成的一个正长号码。线程ID是唯一的,并且在其生命周期内保持不变。当一个线程终止时,这个线程ID可以被重用。
public long getId(){}

// 返回此线程的状态。这种方法是为了监控系统状态而设计的,而不是为了同步控制。
public Thread.State getState(){}

// 打印当前线程的堆栈跟踪到标准错误流。此方法仅用于调试。
public static void dumpStack(){}

Executor

执行提交的可运行任务的对象。该接口提供了一种将任务提交与每个任务的运行机制分离的方法,包括线程使用、调度等细节。

通常使用执行器,而不是像原来那样创建线程。比如,而不是调用 new Thread(new(RunnableTask())).start(); 这样去启动一个线程。

对于一组任务中的每一项,可以使用:

Executor executor = anExecutor;

executor.execute(new RunnableTask1());

executor.execute(new RunnableTask2());

然而,Executor接口并不严格要求执行是异步的。在最简单的情况下,执行者可以在调用者的线程中立即运行提交的任务:

class DirectExecutor implements Executor {
   public void execute(Runnable r) {
     r.run();
   }
}

更典型的情况是,任务在调用方线程以外的一些线程中执行。下面的执行器为每个任务生成一个新线程。

class ThreadPerTaskExecutor implements Executor {
   public void execute(Runnable r) {
     new Thread(r).start();
   }
}

许多Executor实现对任务调度的方式和时间施加了某种限制。下面的executor将任务提交给第二个executor序列化,说明了复合executor

class SerialExecutor implements Executor {
   final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
   final Executor executor;
   Runnable active;

   SerialExecutor(Executor executor) {
     this.executor = executor;
   }

   public synchronized void execute(final Runnable r) {
     tasks.offer(new Runnable() {
       public void run() {
         try {
           r.run();
         } finally {
           scheduleNext();
         }
       }
     });
     if (active == null) {
       scheduleNext();
     }
   }

   protected synchronized void scheduleNext() {
     if ((active = tasks.poll()) != null) {
       executor.execute(active);
     }
   }
}

这个包中提供的Executor实现,实现了ExecutorService,这是一个更广泛的接口。ThreadPoolExecutor类提供了一个可扩展的线程池实现。Executors类为这些执行器提供了方便的工厂方法。

内存一致性影响:在将可运行对象提交给执行器之前,线程中的操作发生在其执行开始之前,可能发生在另一个线程中。

这个仅是一个接口,提供了 void execute(Runnable command) 这么一个方法;

就是在将来的某个时间执行给定的命令。该命令可以在新线程、池线程或调用线程中执行,具体取决于执行者实现。

像基于这个接口扩展实现的类挺多的,而且应用广泛,比如ThreadPoolExecutor,TaskExecutor等等.

Executors

此包中定义的Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和可调用类的工厂和实用程序方法。此类支持以下几种方法:

1)创建并返回ExecutorService的实现方法,该服务使用常用的配置进行设置。

2)创建并返回使用常用配置设置的ScheduledExecutorService。

3)创建并返回包装ExecutorService的方法,该服务通过使特定于实现的方法不可访问来禁用重新配置。

4)创建并返回一个ThreadFactory,该ThreadFactory将新创建的线程设置为已知状态。

5)从其他类似闭包的表单中创建并返回可调用的,因此可以在需要可调用的执行方法中使用。

简单说Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。主要有newFixedThreadPool,newCachedThreadPool,newSingleThreadExecutor,newScheduledThreadPool 四种。等下就着重讲解这几个方法。


非常重要面试必问


提供方法

如果细看里面的每个方法实现,可以发现没有过多的逻辑,都是帮助创建new一个对象返回。叫它 ExecutorFactoryMethod 感觉更合适些。

像这样的类就没必要细看,因为它没有很多东西,都是把别人的拿过来,做了个封装门面;

// 这几个方法都是返回一个Callable对象,当被调用时,返回给定的任务结果。
public static Callable<Object> callable(PrivilegedAction<?> action){}
public static Callable<Object> callable(PrivilegedExceptionAction<?> action){}
public static Callable<Object> callable(Runnable task){}
public static <T> Callable<T> callable(Runnable task, T result){}
public static <T> Callable<T> privilegedCallable(Callable<T> callable){}
public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable){}
// 返回用于创建新线程的默认线程工厂。
public static ThreadFactory defaultThreadFactory(){}
// 创建一个线程池,该线程池根据需要创建新线程,但将在可用时重用以前构造的线程。
// 也可以根据需要时使用提供的ThreadFactory来创建新线程。
public static ExecutorService newCachedThreadPool(){}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory){}
// 创建一个线程池,该线程池重用固定数量的在共享无界队列上操作的线程。
// 也可以根据需要时使用提供的ThreadFactory来创建新线程。
public static ExecutorService newFixedThreadPool(int nThreads){}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory){}
// 创建一个Executor,该Executor使用单个工作线程在无界队列上操作。
// 同样并在需要时使用提供的ThreadFactory创建一个新线程。
public static ExecutorService newSingleThreadExecutor(){}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory){}
// 创建一个工作窃取线程池,使用所有可用的处理器作为其目标并行级别。
public static ExecutorService newWorkStealingPool(){}
// 创建一个线程池,该线程池维护足够的线程以支持给定的并行性级别,并且可以使用多个队列来减少争用。
public static ExecutorService newWorkStealingPool(int parallelism){}
// 创建一个单线程执行器,该执行器可以安排命令在给定的延迟后运行或定期执行。
public static ScheduledExecutorService newSingleThreadScheduledExecutor(){}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory){}
// 返回一个线程工厂,用于创建与当前线程具有相同权限的新线程。
public static ThreadFactory privilegedThreadFactory(){}

接下来就看看很重要的那几个方法 newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor、newScheduledThreadPool 四个方法:

先把这几个方法都拿出来看下:

// 创建一个固定大小的线程池,该线程池重用在共享无界队列上运行的固定数量的线程。在任何时候,线程都是最多活动的处理任务。
// 如果在所有线程都处于活动状态时提交其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在停止之前的执行过程中由于故障而终止,
// 那么如果需要执行后续任务,将替换一个新线程。池中的线程将一直存在,直到显式关闭。
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

// 创建一个可缓冲的线程池,该线程池根据需要创建新线程,但在以前构造的线程可用时将重用这些线程。
// 这些池通常会提高执行许多短期异步任务的程序的性能。调用execute将重用以前构造的线程(如果可用)。
// 如果没有可用的现有线程,将创建一个新线程并将其添加到池中。60秒未使用的线程将被终止并从缓存中删除。
// 因此,闲置足够长时间的池不会消耗任何资源。请注意,可以使用ThreadPoolExecutor构造函数创建具有类似属性但不同细节(例如超时参数)的池。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

// 创建一个单个线程的线程执行器,该执行器使用一个工作线程在无界队列上运行。(但是请注意,如果此单线程在关机前的执行过程中由于故障而终止,
// 那么如果需要执行后续任务,将使用一个新线程代替它。)任务保证按顺序执行,并且在任何给定时间都不会有多个任务处于活动状态。
// 与其他等效的newFixedThreadPool 不同,返回的执行器保证不可重新配置以使用其他线程。
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(
    	new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())
     );
}

// 创建一个可延迟的线程池,该线程池可以安排命令在给定延迟后运行或定期执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

// 下面这个是 return new ScheduledThreadPoolExecutor(corePoolSize) 的构造函数;
public ScheduledThreadPoolExecutor(int corePoolSize) {
      super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {}

可以看出这个几个方法,都是创建了 ThreadPoolExecutor 对象,或者在外层做了包装。那重点就来到了 ThreadPoolExecutor;

ThreadPoolExecutor

ThreadPoolExecutor是一种ExecutorService,使用可能的几个池线程之一执行每个提交的任务,通常使用Executors工厂方法进行配置。

线程池解决两个不同的问题:它们通常在执行大量异步任务时提供更好的性能,这是因为减少了每个任务的调用开销;它们还提供了一种方法,用于限制和管理执行任务集合时消耗的资源,包括线程。每个ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量。

为了在广泛的上下文中发挥作用,这个类提供了许多可调参数和可扩展性挂钩。然而,程序员被鼓励使用更方便的Executors工厂方法Executors.newcachedthreadpool()(无限线程池,带有自动线程回收),Executors. newfixedthreadpool (int)(固定大小的线程池)和Executors. newsinglethreadexecutor()(单个后台线程),为最常见的使用场景预先配置设置。否则,在手动配置和调优该类时,请使用以下指南:


1)核心线程数和最大线程数

ThreadPoolExecutor会根据corePoolSize()和maximumPoolSize()设置的边界自动调整池的大小(参见getPoolSize())。当一个新任务在方法execute(Runnable)中提交,并且运行的线程少于corePoolSize时,一个新线程被创建来处理该请求,即使其他工作线程空闲。如果有大于corePoolSize但小于maximumPoolSize的线程正在运行,则只有当队列已满时才会创建一个新的线程。通过将corePoolSize和maximumPoolSize设置为相同,可以创建一个固定大小的线程池。通过将maximumPoolSize设置为一个本质上无界的值,比如Integer.MAX_VALUE,允许池容纳任意数量的并发任务。最典型的情况是,核心池和最大池大小仅在构造时设置,但它们也可以使用setCorePoolSize(int)和setMaximumPoolSize(int)动态更改。


2)按需构建

默认情况下,即使核心线程最初也会创建,并仅在新任务到达时启动,但这可以使用方法prestartCoreThread()或prestartAllCoreThreads()动态覆盖。如果使用非空队列构建池,可能需要预启动线程。


2-1)创建新线程

使用ThreadFactory创建新线程。如果没有指定,则使用executtors.defaultthreadfactory(),该方法创建的线程都在同一个线程组中,具有相同的NORM_PRIORITY优先级和非守护进程状态。通过提供一个不同的ThreadFactory,你可以改变线程的名称、线程组、优先级、守护进程状态等。如果ThreadFactory通过newThread返回null请求创建线程失败,执行器将继续执行,但可能无法执行任何任务。线程应该拥有“modifyThread”RuntimePermission。如果使用该池的工作线程或其他线程不拥有此权限,则服务可能降级:配置更改可能不会及时生效,并且关机池可能保持在可能终止但未完成的状态。


2-2)存活时间

如果池当前有超过corePoolSize线程,多余的线程将被终止,如果它们的空闲时间超过了keepAliveTime(参见getKeepAliveTime(TimeUnit))。这提供了一种方法,可以在池没有被积极使用时减少资源消耗。如果以后池变得更活跃,则会构造新的线程。这个参数也可以使用方法setKeepAliveTime(long, TimeUnit)动态更改。Long.MAX_VALUE TimeUnit.NANOSECONDS 有效地禁止空闲线程在关闭之前终止。缺省情况下,只有当corePoolSize以上的线程时,该策略才生效。但是allowCoreThreadTimeOut(boolean)方法也可以用于将这个超时策略应用于核心线程,只要keepAliveTime值不为零。


2-3)排队

任何BlockingQueue都可以用来传输和保存提交的任务。此队列的使用与池大小交互:

a)如果运行的线程少于corePoolSize,Executor总是倾向于添加一个新线程而不是排队。

b)如果corePoolSize或更多的线程正在运行,Executor总是倾向于将请求排队,而不是添加一个新线程。

c)如果请求不能排队,则创建一个新线程,除非该线程超过maximumPoolSize,在这种情况下,任务将被拒绝。


有三种排队的策略:


直接的传递。

对于工作队列来说,一个好的默认选择是SynchronousQueue,它将任务传递给线程,而不持有它们。在这里,如果没有立即可用的线程来运行任务,尝试对任务进行排队将失败,因此将构造一个新线程。当处理可能具有内部依赖关系的请求集时,此策略可以避免锁定。直接切换通常需要无限制的maximumpoolsize,以避免拒绝新提交的任务。反过来,当命令的平均到达速度超过处理速度时,就有可能出现无限制的线程增长。


无界队列。

使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致在所有corePoolSize线程都繁忙时,新的任务在队列中等待。因此,只会创建corePoolSize线程。(因此maximumPoolSize的值没有任何影响。)当每个任务完全独立于其他任务时,这可能是合适的,因此任务不会影响其他任务的执行;例如,在一个网页服务器中。虽然这种类型的排队在平滑短暂的请求突发方面很有用,但当命令的平均到达速度超过它们的处理速度时,它承认了无限工作队列增长的可能性。


有界的队列。

当使用有限的maximumPoolSizes时,有界队列(例如ArrayBlockingQueue)有助于防止资源耗尽,但可能更难调优和控制。队列大小和最大池大小可以相互权衡:使用大型队列和小型池可以最小化CPU使用、OS资源和上下文切换开销,但可能会人为地降低吞吐量。如果任务经常阻塞(例如,如果它们受I/O限制),系统可能会为比您所允许的更多的线程调度时间。使用小队列通常需要更大的池大小,这使得cpu更加繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。


3)拒绝任务

在方法execute(Runnable)中提交的新任务将在Executor已经关闭,以及Executor使用了有限的最大线程和工作队列容量,并且饱和时被拒绝。在这两种情况下,execute方法都调用RejectedExecutionHandler.reectedexecutionhandler的reectedexecutionhandler方法。提供了四个预定义的处理程序策略

a)在默认ThreadPoolExecutor中。在被拒绝时,处理程序抛出一个运行时RejectedExecutionException。

b)在ThreadPoolExecutor.callerrunsppolicy,调用execute本身的线程运行该任务。这提供了一个简单的反馈控制机制,可以降低新任务提交的速度。

c)在ThreadPoolExecutor.DiscardPolicy,一个不能执行的任务将被简单地删除。

d)在ThreadPoolExecutor.DiscardOldestPolicy,如果执行器没有关闭,工作队列头的任务将被删除,然后重试执行(可能再次失败,导致重复执行)。

也可以定义和使用其他类型的RejectedExecutionHandler类。这样做需要一些注意,特别是当策略设计为仅在特定容量或排队策略下工作时。


4)钩子法

该类提供受保护的可重写beforeExecute(Thread, Runnable)和afterExecute(Runnable, Throwable)方法,它们在每个任务执行之前和之后被调用。这些可以用来操纵执行环境;例如,重新初始化ThreadLocals、收集统计信息或添加日志条目。此外,可以重写terminate()方法来执行在Executor完全终止后需要执行的任何特殊处理。

如果钩子或回调方法抛出异常,内部工作线程可能反过来失败并突然终止。


5)队列维护

getQueue()方法允许访问工作队列,以便进行监视和调试。强烈建议将此方法用于其他目的。

提供的两个方法remove(Runnable)和purge()可用于在大量排队任务被取消时协助存储回收。


6)终止

在程序中不再引用且没有剩余线程的池将自动关闭。如果你想确保即使用户忘记调用shutdown(),未被引用的池也被回收,那么你必须通过设置适当的保持活动时间,使用0内核线程的下界和/或设置allowCoreThreadTimeOut(boolean)来安排未被使用的线程最终死亡。

扩展示例:这个类的大多数扩展会覆盖一个或多个受保护的钩子方法。例如,下面是一个子类,它添加了一个简单的暂停/恢复特性:

class PausableThreadPoolExecutor extends ThreadPoolExecutor {
   private boolean isPaused;
   private ReentrantLock pauseLock = new ReentrantLock();
   private Condition unpaused = pauseLock.newCondition();

   public PausableThreadPoolExecutor(...) { super(...); }

   protected void beforeExecute(Thread t, Runnable r) {
     super.beforeExecute(t, r);
     pauseLock.lock();
     try {
       while (isPaused) unpaused.await();
     } catch (InterruptedException ie) {
       t.interrupt();
     } finally {
       pauseLock.unlock();
     }
   }

   public void pause() {
     pauseLock.lock();
     try {
       isPaused = true;
     } finally {
       pauseLock.unlock();
     }
   }

   public void resume() {
     pauseLock.lock();
     try {
       isPaused = false;
       unpaused.signalAll();
     } finally {
       pauseLock.unlock();
     }
   }
 }


构造方法&属性

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}
// 上面的this都指向了这个方法,一些核心参数由程序员指定,其他的可以使用默认的,也可以使用这个方法传进来。
// 使用给定的初始参数创建一个新的ThreadPoolExecutor。主要看这几个入参
// corePoolSize:池中要保留的核心线程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut。
// maximumPoolSize:池中允许的最大线程数。
// keepAliveTime:当线程数大于核心数时,多余空闲线程在终止前等待新任务的最长时间。
// unit:keepAliveTime参数的时间单位(分钟、秒、毫秒等)。
// workQueue:用于在任务执行前保存任务的队列。此队列将仅包含execute方法提交的可运行任务。
// threadFactory:执行器创建新线程时要使用的工厂。
// handler:由于达到线程边界和队列容量而阻止执行时要使用的处理方式。
// 通过 Executors 类里的那几个方法我们就知道,那些参数是比较核心的;
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}


线程池核心参数详解:

corePoolSize:表示线程池中的核心线程数,也就是当提交一个任务时,线程池就会创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize:表示线程池中允许的最大线程数。如果当前阻塞队列满了,还在不端的提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;

keepAliveTime:表示线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;

unit:keepAliveTime的单位;

workQueue:它是用来保存等待被执行的任务的阻塞队列,任务必须实现Runable接口。在JDK中提供了如下这么几个阻塞队列:

1)ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO(先进先出)进行排序任务;

2)LinkedBlockingQuene:基于链表结构的阻塞队列,也是按照FIFO(先进先出)进行排序任务,吞吐量要比ArrayBlockingQuene高;

3)SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则它的插入操作一直是处于阻塞状态,它的吞吐量要比LinkedBlockingQuene高;

4)PriorityBlockingQuene:是一个具有优先级的无界阻塞队列;

ThreadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。

handler:线程池的饱和策略,也就是当阻塞队列满了,还没有空闲的工作线程,如果继续提交任务,就必须采取一种策略来处理该任务,那么线程池它提供了4种策略:

1)AbortPolicy:直接抛出异常,是默认的策略;

2)CallerRunsPolicy:它是用调用者所在的线程来执行任务;

3)DiscardOldestPolicy:这个策略会丢弃阻塞队列中靠最前的任务,并且来执行当前任务;

4)DiscardPolicy:这个会直接丢弃新来的任务;

这上面的4种策略都是ThreadPoolExecutor的内部类。我们也可以根据应用场景实现RejectedExecutionHandler接口,自定义一个饱和策略,比如记录日志或持久化存储不能处理的任务。

线程池

线程池(ThreadPool)一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。

任务调度以执行线程的常见方法是使用同步队列,称作任务队列。池中的线程等待队列中的任务,并把执行完的任务放入完成队列中。

线程池模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。

半同步/半异步模式又称为生产者消费者模式,是比较常见的实现方式,比较简单。分为同步层、队列层、异步层三层。同步层的主线程处理工作任务并存入工作队列,工作线程从工作队列取出任务进行处理,如果工作队列为空,则取不到任务的工作线程进入挂起状态。由于线程间有数据通信,因此不适于大数据量交换的场合。

领导者跟随者模式是在线程池中的线程可处在3种状态之一:领导者leader、追随者follower或工作者processor。任何时刻线程池只有一个领导者线程。事件到达时,领导者线程负责消息分离,并从处于追随者线程中选出一个来当继任领导者,然后将自身设置为工作者状态去处置该事件。处理完毕后工作者线程将自身的状态置为追随者。这一模式实现复杂,但避免了线程间交换任务数据,提高了CPU Cache相似性。在ACE(Adaptive Communication Environment)中,提供了领导者跟随者模式实现。



线程池的伸缩性对性能也有很大的影响:

创建太多线程,将会浪费一定的资源,有些线程未被充分使用。

销毁太多线程,将导致之后浪费时间再次创建它们。

创建线程太慢,将会导致长时间的等待,性能变差。

销毁线程太慢,导致其它线程资源饥饿。

ThreadLocal

我们都知道多线程在访问同一个共享变量的时候特别容易出现并发问题,特别是在多线程需要对一个共享变量进行写入的时候。那么为了保证线程的安全,一般在访问共享变量时候适当的进行同步(也就是加锁),那使用者对加锁就得有一定的了解。

加重了使用者的负担。那么有没有一种方式可以做到,当创建一个共享变量之后,多线程在访问的时候,每个线程访问的是自己线程的变量呢?ThreadLocal就可以做这件事,但它并不是为了解决这个问题出现的。

ThreadLocal是JDK包提供的,表示是线程的一个本地变量。也就是如果你创建了一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的一个本地副本。当多个线程操作这个变量的时候,其实操作的是自己本地内存的变量,从而避免了线程安全的问题。

public class ThreadLocalSample {

    // 创建ThreadLocal变量
    private static ThreadLocal<String> tl = new ThreadLocal<>();

    // 创建并运行两个线程
    public static void main(String[] args) {
        // 线程1
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                tl.set("我是T1线程 ThreadLocal 本地变量");
                print("T1");
            }
        });
        // 线程2
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                tl.set("我是T2线程 ThreadLocal 本地变量");
                print("T2");
            }
        });
        // 启动线程
        t1.start();
        t2.start();
    }

    private static void print(String str){
        // 打印当前线程本地内存中的ThreadLocal变量
        System.out.println(str + " == " + tl.get());
        // 清除当前线程本地内存中的ThreadLocal变量
//        tl.remove();
    }
}

// 控制台输出
T2 == 我是T2线程 ThreadLocal 本地变量
T1 == 我是T1线程 ThreadLocal 本地变量

ThreadLocal主要还是依赖线程(Thread)做事情,在Thread类里有这么两个属性

// ThreadLocal values pertaining to this thread. This map is maintained by the ThreadLocal class.
// 与此线程相关的线程本地值。此映射由ThreadLocal类维护
ThreadLocal.ThreadLocalMap threadLocals = null;

// InheritableThreadLocal values pertaining to this thread. This map is maintained by the InheritableThreadLocal class.
// 与此线程相关的InheritableThreadLocal值。此映射由InheritableThreadLocal类维护。
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

在 ThreadLocal 里面有一个定制化的map,就是ThreadLocalMap;ThreadLocalMap是一个定制的hashMap,仅适用于维护线程本地值。在ThreadLocal类之外不会导出任何操作。该类是包私有的,允许在类线程中声明字段。为了帮助处理非常大和长期的用法,哈希表条目使用weakreference作为键。但是,由于不使用引用队列,因此只有当表开始耗尽空间时,才能保证删除过时的条目。

注意ThreadLocal不支持线程之间的继承性,也就是说在父线程设置的本地变量,在子线程是获取不到的,如下:

// 创建ThreadLocal变量
private static ThreadLocal<String> tl = new ThreadLocal<>();

private static void print(String str){
    // 打印当前线程本地内存中的ThreadLocal变量
    System.out.println(str + " == " + tl.get());
}

public static void main(String[] args) {
    tl.set("我是 main 方法主线程 ThreadLocal 本地变量");
    // 线程1
    Thread t1 = new Thread(new Runnable() {
        @Override
        public void run() {
            print("T1");
        }
    });
    t1.start();

    print("main");
}
 
// 控制台输出
main == 我是 main 方法主线程 ThreadLocal 本地变量
T1 == null

就因为两个是不同的线程,所以在main线程设置的变量从T1线程获取不到。那有没有办法能让获取到呢?肯定是有的,就是下面的这个 inheritableThreadLocal 类。

InheritableThreadLocal

inheritableThreadLocal 继承了 ThreadLocal,提供了一个特性,就是可以让子线程去访问在父线程中设置的本地变量。只需要把我们上面 创建ThreadLocal变量的代码修改为

// 创建 InheritableThreadLocal 变量
private static InheritableThreadLocal<String> tl = new InheritableThreadLocal<>();
// 再次运行main方法,控制台输出
main == 我是 main 方法主线程 ThreadLocal 本地变量
T1 == 我是 main 方法主线程 ThreadLocal 本地变量

这个类就重写了3个方法,其他都是用的ThreadLocal父类的。我们上面提过,这个主要就是依赖线程去做事情的,就离不开Thread类。

在 Thread 类的构造函数里

private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) {
    ...代码省略...
    
    // 获取当前线程
    Thread parent = currentThread();
    
    ...代码省略...
    
    // 如果父线程的 inheritableThreadLocals 不为null,就设置子线程的 inheritableThreadLocals 变量
    if (inheritThreadLocals && parent.inheritableThreadLocals != null)
    	this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
     
    ...代码省略...
}



版权声明:本文为weixin_46129103原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。