ThreadPoolExecutor线程池的简单使用说明
ThreadPoolExecutor提供了一些便捷方法创建线程池
newFixedThreadPool():创建固定大小的线程池(使用的线程无法回收,容易线程耗尽)
newCachedThreadPool():创建一个不限制线程数上限的线程池(容易OOM)
newSingleThreadPool():创建一个单线程的线程池
ThreadPoolExecutor:以构造方法形式创建线程池,有7个参数
这些方法虽然比较简便,但是容易产生各种问题,实际生产过程中我们多用构造方法创建线程池
基础案例
先来一个最基础的简单应用:
public class Test01 {
public static void main(String[] args) {
long t1 = System.currentTimeMillis();
ThreadPoolExecutor threadPool = getThreadPool();
long t2 = System.currentTimeMillis();
System.out.println("Thread Pool needs time : "+(t2-t1));
Future f1 = threadPool.submit(new Task1());
Future f2 = threadPool.submit(new Task1());
try {
Object r1 = f1.get();
int result1 = (Integer) r1;
System.out.println("result1 : "+result1);
Object r2 = f2.get();
int result2 = (Integer) r2;
System.out.println("result2 : "+result2);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
private static ThreadPoolExecutor getThreadPool() {
int corePoolSize=2;
int maximumPoolSize=4;
int keepAliveTime=30;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue queue = new ArrayBlockingQueue(10);
ThreadFactory threadFactory=null;
RejectedExecutionHandler handler=new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,queue,handler);
return threadPoolExecutor;
}
}
class Task1 implements Callable{
@Override
public Object call() throws Exception {
long t1 = System.currentTimeMillis();
int random = new Random().nextInt(10000);
Thread.sleep(random);
long t2 = System.currentTimeMillis();
String name = Thread.currentThread().getName();
System.out.println(name+" -- needs time: "+(t2-t1)+" randomTime : "+random);
return random;
}
}
简单说明:
1. 线程池参数
需要指定几个参数:
corePoolSize:核心线程数数量(一直保证存活的线程)
maximumPoolSize:最大线程数量(如果核心线程数量不够,线程池会创建临时线程,核心线程+临时线程不可大于最大线程数)
keepAliveTime:指定临时线程在空闲多少时间后被销毁,单位由TimeUnit指定
TimeUnit:为keepAliveTime指定单位
BlockingQueue:工作队列的存储方式,一般用BlockingQueue存储
*ThreadFactory:使用自定义线程工厂来创建线程(可不配)
RejectedExecutionHandler:线程处理报错或超时后的策略
1.1. BlockingQueue接口
基于ReentrantLock实现,模型是生产者往该队列放数据,消费者在此队列取数据
当队列数据满时,生产者会产生阻塞,并等待队列中有可用空间再往里面放数据
当队列中没有数据的时候,消费者会阻塞,直到队列中有新的数据
实现类有:
ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等
主要区别是储存结构和对元素的操作上有所不同,put和take方法是一样的
常用方法:
offer:如果队列没满,返回true,队列满了,返回false–不阻塞
put:如果队列满了,会一直阻塞,直到队列有空间或程序中断–阻塞
offer(Entity e, long timeout, TimeUnit unit):再队列尾部插入一个数据,如果队列满了,则进入阻塞状态–阻塞
poll:如果没有元素,则直接返回null,如果有元素,则出队–不阻塞
take:如果队列空了,会一直阻塞,直到有新数据
poll(long timeout, TimeUnit unit):如果队列有数据,则出队,如果队列没数据,则等待,直到超时时间结束
常用实现类:LinkedBlockingQueue和ArrayBlockingQueue
LinkedBlockingQueue:链表结构,出队和入队各有一把锁,最多可存储整数最大值的数据,可视为无限大小,再高并发场景下很实用
ArrayBlockingQueue:数组结构,整个队列只有一把锁,存储数量需要指定,且无法修改
1.2. RejectedExecutionHandler
拒绝策略有:
AbortPolicy:抛出RejectedExecutionException
DiscardPolicy:什么都不做,直接忽略
DiscardOldestPolicy:丢弃队列中最老的任务,为当前提交的任务腾出位置
CallerRunsPolicy:由提交任务者执行这个任务
2. 线程的简单使用和返回值
2.1. 线程
由于submit()只支持Runnable和Callable,所以我们只能用这两个
另外,如果使用invokeAny(),接收的参数是一个Collection,且并且必须继承Callable
2.2. 单任务接收返回值
通过submit方法获取一个返回值Future,然后再从Future中通过get方法获取返回值
可以在get方法中指定参数限制超时时间:
f1.get(long timeout, TimeUnit unit)
,超时后未完成会抛出
TimeoutException
2.2.3. 多任务接收返回值
如果在一个多任务场景下,我们可以对每一个线程进行提交
submit()
,且执行
future.get()
来获取结果
但是在线程池中也提供了一个多任务的Service:
ExcutorCompletionService
2.2.3.1.
ExcutorCompletionService
ExcutorCompletionService
首先,我们需要准备:
准备一个List(在collection接口下即可),里面放入任务
创建ExecutorCompletionService,把上面例子中创建的
threadPool
交给他
通过遍历提交任务,通过take方法得到Future,然后再通过get方法拿到返回值
缺点:由于线程运行时间不确定,所以take方法在某个线程完成任务后会立即返回结果,不能确定是哪个线程的结果
ArrayList<Callable> list = new ArrayList<Callable>();
list.add(new Task1());
list.add(new Task1());
ExecutorCompletionService ecs = new ExecutorCompletionService<Task1>(threadPool);
for (Callable task:list) {
ecs.submit(task);
}
System.out.println("task size : "+list.size());
try {
for (int i = 0; i<list.size(); i++){
Future future = ecs.take();
Integer result = (Integer) future.get();
if(result != null){
System.out.println("result = "+result);
}
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
在多个线程间协调顺序的时候,我们会用程序计数器控制某个线程在其他线程完毕后再开始执行:
CountDownLatch
具体使用请参照另外文章