JavaSE-线程池(5)- ThreadPoolExecutor常用方法

  • Post author:
  • Post category:java

JavaSE-线程池(5)- ThreadPoolExecutor常用方法

invokeAll

ExecutorService 接口中定义的方法,给定一组任务,在所有任务执行完成时返回一个 Futures 列表,其中包含它们的状态和结果。

/**
 * Executes the given tasks, returning a list of Futures holding
 * their status and results when all complete.
 * {@link Future#isDone} is {@code true} for each
 * element of the returned list.
 * Note that a <em>completed</em> task could have
 * terminated either normally or by throwing an exception.
 * The results of this method are undefined if the given
 * collection is modified while this operation is in progress.
 *
 * @param tasks the collection of tasks
 * @param <T> the type of the values returned from the tasks
 * @return a list of Futures representing the tasks, in the same
 *         sequential order as produced by the iterator for the
 *         given task list, each of which has completed
 * @throws InterruptedException if interrupted while waiting, in
 *         which case unfinished tasks are cancelled
 * @throws NullPointerException if tasks or any of its elements are {@code null}
 * @throws RejectedExecutionException if any task cannot be
 *         scheduled for execution
 */
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;

invokeAll 使用方式

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorInvokeAllTest {
    static class MyTask implements Callable<Boolean> {

        private int id;

        public MyTask(int id) {
            this.id = id;
        }

        @Override
        public Boolean call() throws Exception {
            try {
                System.out.println(
                    "time:" + System.currentTimeMillis() + " " + Thread.currentThread() + " execute task " + id +
                        " start");
                Thread.sleep(2000);
                System.out.println(
                    "time:" + System.currentTimeMillis() + " " + Thread.currentThread() + " execute task " + id +
                        " finish");
            } catch (InterruptedException e) {
                e.printStackTrace();
                return false;
            }
            return true;
        }
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        ThreadPoolExecutor executor =
            new ThreadPoolExecutor(10, 40, 5000,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2));
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        List<Callable<Boolean>> tasks = new ArrayList<>();
        for (int i = 1; i <= 5; i++) {
            tasks.add(new MyTask(i));
        }
        List<Future<Boolean>> futures = null;
        try {
            futures = executor.invokeAll(tasks);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("time:" + System.currentTimeMillis() + " 主线程会等待invokeAll执行完成才继续执行");
        for (int i = 0; i < futures.size(); i++) {
            try {
                System.out.println(futures.get(i).get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        executor.shutdown();
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
    }
}

如打印结果所示,主线程会等待 invokeAll 方法中的任务执行完后才继续执行

time:1677229181926 Thread[pool-1-thread-1,5,main] execute task 1 start
time:1677229181927 Thread[pool-1-thread-2,5,main] execute task 2 start
time:1677229181928 Thread[pool-1-thread-3,5,main] execute task 3 start
time:1677229181929 Thread[pool-1-thread-4,5,main] execute task 4 start
time:1677229181930 Thread[pool-1-thread-5,5,main] execute task 5 start
time:1677229183942 Thread[pool-1-thread-2,5,main] execute task 2 finish
time:1677229183942 Thread[pool-1-thread-1,5,main] execute task 1 finish
time:1677229183942 Thread[pool-1-thread-3,5,main] execute task 3 finish
time:1677229183942 Thread[pool-1-thread-5,5,main] execute task 5 finish
time:1677229183942 Thread[pool-1-thread-4,5,main] execute task 4 finish
time:1677229183965 主线程会等待invokeAll执行完成才继续执行
true
true
true
true
true
耗时:2141

一般在使用 invokeAll 方法时建议加上等待时间,防止任务执行时间过长线程一直阻塞,方法定义如下:

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

如果在 1 秒钟时间内任务没有结束将会被取消

futures = executor.invokeAll(tasks,1,TimeUnit.SECONDS);

执行结果如下:

time:1677231068880 Thread[pool-1-thread-1,5,main] execute task 1 start
time:1677231068881 Thread[pool-1-thread-3,5,main] execute task 3 start
time:1677231068880 Thread[pool-1-thread-2,5,main] execute task 2 start
time:1677231068881 Thread[pool-1-thread-4,5,main] execute task 4 start
time:1677231068881 Thread[pool-1-thread-5,5,main] execute task 5 start
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at test.executors.ThreadPoolExecutorInvokeAllTest$MyTask.call(ThreadPoolExecutorInvokeAllTest.java:27)
	at test.executors.ThreadPoolExecutorInvokeAllTest$MyTask.call(ThreadPoolExecutorInvokeAllTest.java:13)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
。。。省略部分报错
java.util.concurrent.CancellationException
	at java.util.concurrent.FutureTask.report(FutureTask.java:121)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at test.executors.ThreadPoolExecutorInvokeAllTest.main(ThreadPoolExecutorInvokeAllTest.java:59)
java.util.concurrent.CancellationException
	at java.util.concurrent.FutureTask.report(FutureTask.java:121)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at test.executors.ThreadPoolExecutorInvokeAllTest.main(ThreadPoolExecutorInvokeAllTest.java:59)
。。。省略部分报错
time:1677231069895 主线程会等待invokeAll执行完成才继续执行
耗时:1113

invokeAny

ExecutorService 接口中定义的方法,给定一组任务,只要有一个任务执行完成就返回这个任务的结果

/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do. Upon normal or exceptional return,
* tasks that have not completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
* 执行给定的任务,返回成功完成的任务的结果(即没有抛出异常),如果有的话。
* 在正常或异常返回时,未完成的任务将被取消。如果在执行此操作时修改了给定的集合,则此方法的结果是未定义的。
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
   throws InterruptedException, ExecutionException;

invokeAny 使用方式

如下,有5个任务,每个任务等待时间为 0-3 秒,线程池调用 invokeAny 方法获取最终执行的任务名称

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ThreadPoolExecutorInvokeAnyTest {
    static class MyTask implements Callable<String> {

        private int id;

        public MyTask(int id) {
            this.id = id;
        }

        @Override
        public String call() throws Exception {
            Integer randomTime = new Random().nextInt(4) * 1000;
            try {
                System.out.println(
                    "time:" + System.currentTimeMillis() + " " + Thread.currentThread() + " execute task " + id +
                        " start,random time :"+randomTime);
                Thread.sleep(randomTime);
                System.out.println(
                    "time:" + System.currentTimeMillis() + " " + Thread.currentThread() + " execute task " + id +
                        " finish");
            } catch (InterruptedException e) {
                e.printStackTrace();
                return "";
            }
            return "task" + id;
        }
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        ThreadPoolExecutor executor =
            new ThreadPoolExecutor(10, 40, 5000,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2));
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        List<Callable<String>> tasks = new ArrayList<>();
        for (int i = 1; i <= 5; i++) {
            tasks.add(new MyTask(i));
        }
        String futureResult = null;
        try {
            futureResult = executor.invokeAny(tasks, 3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        System.out.println("time:" + System.currentTimeMillis() + " 主线程会等待invokeAny执行完成才继续执行");
        System.out.println("执行的任务为:" + futureResult);
        executor.shutdown();
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
    }
}

执行结果如下,task5等待0秒,所以 invokeAny方法返回结果为task5,其他任务等待时间都超过0秒,所以都被取消执行


版权声明:本文为sql2008help原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。