# 工具类 Executors

注意区别 Executor 类和 ExcetorService ,前者提供的是显式地创建线程运行提交过来的任务, ExecutorService 继承了 Executor ,多提供了管理终止方法和生成 Future 方法。但是创建出来的线程交给谁去运行的呢?接口只是规定了行为,而不会去管实现。

还是先看一下相关的类结构:

Executors 就实现了创建线程池的操作(真的是 Executors 实现的吗?),通过 Executors 可以创建一些特定的线程池:

  • Executors.newCachedThreadPool() :创建一个可缓存的线程池,如果线程池的大小超过了需要,可以灵活回收空闲线程,如果没有可回收线程,则新建线程。默认线程闲置 60 秒就销毁。
  • Executors.newFixedThreadPool(int) :创建一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待 。
  • Executors.newSingleThreadExecutor() : 创建一个单线程化的线程池,使用一个唯一的工作线程执行任务,保证所有任务按照指 定顺序(先入先出或者优先级)执行 。
  • Executors.newScheduledThreadPool() :创建一个定长的线程池,支持定时、周期性的任务执行
  • Executors.newSingleThreadScheduledExecutor() : 创建一个单线程化的线程池其实返回的是 DelegatedScheduledExecutorService ,类似于包装(代理)类,支持定时、周期性的任务执行
  • Executors.newWorkStealingPool() :创建一个具有并行级别的 work-stealing 线程池

前三个返回都是 ExecutorServiceThreadPoolExecutor ),后面两个是 ScheduledExecutorService

讲一个例子,定时线程池:

public class App {
    // 避免重复创建 Date 对象等
    static SimpleDateFormat format = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
    public static void main(String[] args) {
       // 定义定时线程池,ScheduleexecutorService 接口继承了 ExecutorService 接口
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
        // 任务 1:间隔两秒后秒获取当前时间,参数 2 是延时时间,参数 3 是时间单位
        executorService.schedule(() -> {
            System.out.println(format.format(System.currentTimeMillis()));
        }, 2, TimeUnit.SECONDS);
        // 任务 2:间隔两秒后,每隔一秒打印当前时间,参数 3 是之后每个多久就执行一次
        executorService.scheduleAtFixedRate(() -> {
            System.out.println(format.format(System.currentTimeMillis()));}, 2, 1, TimeUnit.SECONDS);
        //executorService.shutdown();
    }
}

我们随便点进去查看一下源码,可以发现这些方法实际是创建了一个 ThreadPoolExecutor 对象或者将 ThreadPoolExecutor 包装了一下,只是构造方法的参数不同,而这些参数就是配置一个线程池的重要部分

我们使用 Executors 创建线程池,本质上是因为 Executors 类提供了使用了 ThreadPoolExecutor 的简单的 ExecutorService 实现。

# ThreadPoolExecutor

先看一下构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

接受的参数很多,但是必须要全部知道:

  • corePoolSize :线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize , 即使有其他空闲线程能够执行新来的任务,也会继续创建线程;如果当前线程数为 corePoolSize ,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的 prestartAllCoreThreads() 方法,线程池会提前创建并启动所有核心线程。

  • maximumPoolSize 最大线程池大小,当目前线程池中所有的线程都处于运行状态,并且等待队列已满,那么就会直接尝试继续创建新的 非核心线程 运行,但是不能超过最大线程池大小。

  • keepAliveTime 非核心线程最大空闲时间,当一个 非核心线程 空闲超过一定时间,会自动销毁。

  • unit 时间单位

  • workQueue 线程等待队列,当线程池中核心线程数已满时,就会将任务暂时存到等待队列中,直到有线程资源可用为止。

  • threadFactory 线程创建工厂,我们可以干涉线程池中线程的创建过程,进行自定义。

  • handler 拒绝策略,当等待队列线程池都没有空间了,真的不能再来新的任务时,来了个新的多线程任务,那么只能拒绝了,这时就会根据当前设定的拒绝策略进行处理。

线程核心参数是一个重要的面试题,理解记忆,也可以从一个设计者的角度记忆,如果是你在设计线程池,哪些方面你应该考虑,池大小,池中线程存活时间,池创建线程,池拒绝策略等。

为一个线程池分配大小也有讲究,针对线程执行的任务,可以分为 CPU 密集型还是 IO 密集型:

  • CUP 密集型:主要执行计算任务,响应时间很快,CPU 利用率很高,那么线程数应该根据 CPU 核心数决定,CPU 核心数 = 最大同时执行线程数。
  • IO 密集型:主要进行 IO 操作,而 IO 操作容易发生阻塞,时间较长,CPU 就需要等待 IO 操作结束才能继续执行,此时 CPU 就是空闲的,这是可以适当提升线程池大小,让更多的线程一起进行 IO 操作。

一般配置核心参数:

CPU 密集型任务,线程数量 = CPU 核数 + 1

IO 密集型任务,线程数量 = CPU 核数 * 2

# 简单使用

java 线程池的实现原理很简单,说白了就是一个线程集合 workerSet 和一个阻塞队列 workQueue。

想要创建一个自定义的线程池,可以直接 new ThreadPoolExecutor() ,那么我们就要考虑调用哪个构造方法,传递什么参数,我们可以不传入线程创建工厂,使用它默认的:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor executor =
            new ThreadPoolExecutor(2, 4,
                    3, TimeUnit.SECONDS,
                    new SynchronousQueue<>(),
                    new ThreadPoolExecutor.CallerRunsPolicy());
    for (int i = 0; i < 6; i++) {   
        executor.execute(() -> {
            try {
                System.out.println("任务开始执行...");
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

解释上面代码:核心线程数为 2,最大线程数为 4,非核心线程空闲存活时间为 3s,使用 SynchronousQueue 阻塞队列,使用 CallerRunsPolicy 拒绝策略。

线程池的拒绝策略默认有一下几个:

  • AbortPolicy (默认):像上面一样,直接抛异常。

  • CallerRunsPolicy :哪个线程提交的就由哪个执行

  • DiscardOldestPolicy :丢弃队列中最近的一个任务,替换为当前任务。

  • DiscardPolicy :什么也不用做。

SynchronousQueue 队列是没有容量的队列,也就是不能将任务放到阻塞队列中。不能传 null ,会抛出异常。

有个值得注意的点,如果阻塞队列使用 SynchronousQueue ,而拒绝策略使用 DiscardOldestPolicy 的话,会 StackOverflowError

我们先看一下,当提交一个任务:

executor.execute(()-> System.out.println(100));

而此时线程池执行任务的线程已经满了,就会将该任务加入到阻塞队列中,但是此时加入的时候,发现阻塞队列满了(因为本来就没有容量),就会将任务传给拒绝策略。这些都是 execute() 方法里做的事。

看一下 DiscardOldestPolicy 拒绝策略的源码:

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();   // 出队操作,但是这对于 SynchronousQueue 来说毫无意义
            e.execute(r);     // 调用 execute 方法,如果还没有线程空闲,就可以放入阻塞独队列
        }
    }
}

如果线程池的线程迟迟没有空闲,这两个方法就会无限递归下去,然后爆栈。

# 关闭线程池

遍历线程池中的所有线程,然后逐个调用线程的 interrupt 方法来中断线程。其中涉及到两个方法:

  • shutdown 方法:将线程池的状态设置为 SHUTWDOWN 状态,正在执行的任务会继续执行下去,没有被执行线程则中断。在队列中的任务会被继续执行。
  • shutdownNow 则是将线程池的状态设置为 STOP ,所有线程都被中断,正在执行的任务则被停止,没被执行的任务会形成一个任务列表被返回。
public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int j = i;
            executorService.execute(()->f(j));
        }
        //shutdown,会将队列中的任务继续运行,而 shutdownNow 不会
        executorService.shutdownNow();
        System.out.println(executorService.isShutdown());
        System.out.println(executorService.isTerminated());
    }
    public static void f(int i) {
        System.out.println("第" + i + "个任务执行开始...");
        // 输入一个字符串并打印
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            System.out.println("第" + i + "个任务执行被中断...");
            return;
        }
        System.out.println("第" + i + "个任务执行结束...");
    }
}

需要注意的是,中断调用的方法是 Thread.interrupt()这种方法的作用有限,如果线程中没有 sleepwaitCondition 、定时锁等应用, interrupt() 方法是无法中断当前的线程的。所以, ShutdownNow() 并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

你可以测试一下,把 f(int i) 方法的 Thread.Sleep(1000); 换成 new Scanner(System.in).nextLine() ,那么即使调用了 shutdownNow() ,程序也不会停止。

你可能注意到, f 接收的参数被我设置成了 final ,为啥需要设置为 final,你可以查一下,也可以看看这篇文章

# 自定义拒绝策略

通过查看源码可知, ThreadPoolExecutor 接受的拒绝策略参数是 RejectedExecutionHandler ,所以我们可以自己写一个决绝策略的类,继承 RejectedExecutionHandler 接口:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

写一个实现类:

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
 
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 阿巴阿巴操作
        System.out.println(r.toString() + " is rejected");
    }
 
}

测试一下:

public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(5,5,12,
                TimeUnit.SECONDS,new SynchronousQueue<>()
                new RejectedExecutionHandlerImpl());
        for (int i = 0; i < 6; i++) {
            final int j = i;
            executorService.execute(()->f(j));
        }
    }
    public static void f(int i) {
        System.out.println("第" + i + "个任务执行开始...");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            System.out.println("第" + i + "个任务执行被中断...");
            return;
        }
        System.out.println("第" + i + "个任务执行结束...");
    }
}

# 源码解析

ThreadPoolExecutor 的重要属性其实就是构造函数传入的那几个参数,其他重要的还有:

// 原子操作类,避免出现并发问题,低 29 位表示线程池中线程数,通过高 3 位表示线程池的运行状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//worker 的集合,用 set 来存放
private final HashSet<Worker> workers = new HashSet<Worker>();

看一下线程池运行状态以及转换:

# 任务执行

execute –> addWorker –>runworker (getTask)

线程池的工作线程通过 Woker 类实现,在 ReentrantLock 锁的保证下,把 Woker 实例插入到 HashSet 后,并启动 Woker 中的线程。 从 Woker 类的构造方法实现可以发现:线程工厂在创建线程 thread 时,将 Woker 实例本身 this 作为参数传入,当执行 start 方法启动线程 thread 时,本质是执行了 Worker 的 runWorker 方法。 firstTask 执行完成之后,通过 getTask 方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask 方法会被阻塞并挂起,不会占用 cpu 资源。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {  
    // 线程池的当前线程数小于 corePoolSize,执行 addWorker 创建新线程执行 command 任务
       if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // double check: c, recheck
    // 线程池处于 RUNNING 状态,把提交的任务成功放入阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 回滚到入队操作前,即倘若线程池 shutdown 状态,就 remove (command)
        // 如果线程池没有 RUNNING,成功从阻塞队列中删除任务,执行 reject 方法处理任务
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 线程池处于 running 状态,但是没有线程,则创建线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 往线程池中创建新的线程失败,则 reject 任务
    else if (!addWorker(command, false))
        reject(command);
}

在多线程环境下,线程池的状态时刻在变化,而 ctl.get() 是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将 command 加入 workque 是线程池之前的状态。倘若没有 double check,万一线程池处于非 running 状态 (在多线程环境下很有可能发生),那么 command 永远不会执行。在双重检查下,如果线程池状态发生变化,就可以对这个任务执行拒绝策略

# 任务关闭

shutdown()shutdownNow() ,上文也讲过了,嘿嘿😘

# 参考

https://www.yuque.com/qingkongxiaguang/javase/bnoif5#ef6ebd71

https://pdai.tech/md/java/thread/java-thread-x-juc-executor-ThreadPoolExecutor.html

https://www.iteye.com/blog/justsee-999189

《Java 并发编程的艺术》第 10 章

深入理解高并发编程(第 1 版)