# 工具类 Executors
注意区别 Executor
类和 ExcetorService
,前者提供的是显式地创建线程运行提交过来的任务, ExecutorService
继承了 Executor
,多提供了管理终止方法和生成 Future
方法。但是创建出来的线程交给谁去运行的呢?接口只是规定了行为,而不会去管实现。
还是先看一下相关的类结构:
Executors
就实现了创建线程池的操作(真的是 Executors
实现的吗?),通过 Executors
可以创建一些特定的线程池:
Executors.newCachedThreadPool()
:创建一个可缓存的线程池,如果线程池的大小超过了需要,可以灵活回收空闲线程,如果没有可回收线程,则新建线程。默认线程闲置 60 秒就销毁。Executors.newFixedThreadPool(int)
:创建一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待 。Executors.newSingleThreadExecutor()
: 创建一个单线程化的线程池,使用一个唯一的工作线程执行任务,保证所有任务按照指 定顺序(先入先出或者优先级)执行 。Executors.newScheduledThreadPool()
:创建一个定长的线程池,支持定时、周期性的任务执行Executors.newSingleThreadScheduledExecutor()
: 创建一个单线程化的线程池其实返回的是DelegatedScheduledExecutorService
,类似于包装(代理)类,支持定时、周期性的任务执行Executors.newWorkStealingPool()
:创建一个具有并行级别的 work-stealing 线程池
前三个返回都是 ExecutorService
( ThreadPoolExecutor
),后面两个是 ScheduledExecutorService
。
讲一个例子,定时线程池:
public class App { | |
// 避免重复创建 Date 对象等 | |
static SimpleDateFormat format = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); | |
public static void main(String[] args) { | |
// 定义定时线程池,ScheduleexecutorService 接口继承了 ExecutorService 接口 | |
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); | |
// 任务 1:间隔两秒后秒获取当前时间,参数 2 是延时时间,参数 3 是时间单位 | |
executorService.schedule(() -> { | |
System.out.println(format.format(System.currentTimeMillis())); | |
}, 2, TimeUnit.SECONDS); | |
// 任务 2:间隔两秒后,每隔一秒打印当前时间,参数 3 是之后每个多久就执行一次 | |
executorService.scheduleAtFixedRate(() -> { | |
System.out.println(format.format(System.currentTimeMillis()));}, 2, 1, TimeUnit.SECONDS); | |
//executorService.shutdown(); | |
} | |
} |
我们随便点进去查看一下源码,可以发现这些方法实际是创建了一个 ThreadPoolExecutor
对象或者将 ThreadPoolExecutor
包装了一下,只是构造方法的参数不同,而这些参数就是配置一个线程池的重要部分。
我们使用 Executors
创建线程池,本质上是因为 Executors
类提供了使用了 ThreadPoolExecutor
的简单的 ExecutorService
实现。
# ThreadPoolExecutor
先看一下构造方法:
public ThreadPoolExecutor(int corePoolSize, | |
int maximumPoolSize, | |
long keepAliveTime, | |
TimeUnit unit, | |
BlockingQueue<Runnable> workQueue, | |
ThreadFactory threadFactory, | |
RejectedExecutionHandler handler) { | |
if (corePoolSize < 0 || | |
maximumPoolSize <= 0 || | |
maximumPoolSize < corePoolSize || | |
keepAliveTime < 0) | |
throw new IllegalArgumentException(); | |
if (workQueue == null || threadFactory == null || handler == null) | |
throw new NullPointerException(); | |
this.acc = System.getSecurityManager() == null ? | |
null : | |
AccessController.getContext(); | |
this.corePoolSize = corePoolSize; | |
this.maximumPoolSize = maximumPoolSize; | |
this.workQueue = workQueue; | |
this.keepAliveTime = unit.toNanos(keepAliveTime); | |
this.threadFactory = threadFactory; | |
this.handler = handler; | |
} |
接受的参数很多,但是必须要全部知道:
corePoolSize
:线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize
, 即使有其他空闲线程能够执行新来的任务,也会继续创建线程;如果当前线程数为corePoolSize
,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()
方法,线程池会提前创建并启动所有核心线程。maximumPoolSize
:最大线程池大小,当目前线程池中所有的线程都处于运行状态,并且等待队列已满,那么就会直接尝试继续创建新的非核心线程
运行,但是不能超过最大线程池大小。keepAliveTime
:非核心线程最大空闲时间,当一个非核心线程
空闲超过一定时间,会自动销毁。unit
:时间单位。workQueue
:线程等待队列,当线程池中核心线程数已满时,就会将任务暂时存到等待队列中,直到有线程资源可用为止。threadFactory
:线程创建工厂,我们可以干涉线程池中线程的创建过程,进行自定义。handler
:拒绝策略,当等待队列和线程池都没有空间了,真的不能再来新的任务时,来了个新的多线程任务,那么只能拒绝了,这时就会根据当前设定的拒绝策略进行处理。
线程核心参数是一个重要的面试题,理解记忆,也可以从一个设计者的角度记忆,如果是你在设计线程池,哪些方面你应该考虑,池大小,池中线程存活时间,池创建线程,池拒绝策略等。
为一个线程池分配大小也有讲究,针对线程执行的任务,可以分为 CPU 密集型还是 IO 密集型:
- CUP 密集型:主要执行计算任务,响应时间很快,CPU 利用率很高,那么线程数应该根据 CPU 核心数决定,CPU 核心数 = 最大同时执行线程数。
- IO 密集型:主要进行 IO 操作,而 IO 操作容易发生阻塞,时间较长,CPU 就需要等待 IO 操作结束才能继续执行,此时 CPU 就是空闲的,这是可以适当提升线程池大小,让更多的线程一起进行 IO 操作。
一般配置核心参数:
CPU 密集型任务,线程数量 = CPU 核数 + 1
IO 密集型任务,线程数量 = CPU 核数 * 2
# 简单使用
java 线程池的实现原理很简单,说白了就是一个线程集合 workerSet 和一个阻塞队列 workQueue。
想要创建一个自定义的线程池,可以直接 new ThreadPoolExecutor()
,那么我们就要考虑调用哪个构造方法,传递什么参数,我们可以不传入线程创建工厂,使用它默认的:
public static void main(String[] args) throws InterruptedException { | |
ThreadPoolExecutor executor = | |
new ThreadPoolExecutor(2, 4, | |
3, TimeUnit.SECONDS, | |
new SynchronousQueue<>(), | |
new ThreadPoolExecutor.CallerRunsPolicy()); | |
for (int i = 0; i < 6; i++) { | |
executor.execute(() -> { | |
try { | |
System.out.println("任务开始执行..."); | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}); | |
} | |
} |
解释上面代码:核心线程数为 2,最大线程数为 4,非核心线程空闲存活时间为 3s,使用 SynchronousQueue
阻塞队列,使用 CallerRunsPolicy
拒绝策略。
线程池的拒绝策略默认有一下几个:
AbortPolicy
(默认):像上面一样,直接抛异常。CallerRunsPolicy
:哪个线程提交的就由哪个执行DiscardOldestPolicy
:丢弃队列中最近的一个任务,替换为当前任务。DiscardPolicy
:什么也不用做。
而 SynchronousQueue
队列是没有容量的队列,也就是不能将任务放到阻塞队列中。不能传 null
,会抛出异常。
有个值得注意的点,如果阻塞队列使用 SynchronousQueue
,而拒绝策略使用 DiscardOldestPolicy
的话,会 StackOverflowError
。
我们先看一下,当提交一个任务:
executor.execute(()-> System.out.println(100)); |
而此时线程池执行任务的线程已经满了,就会将该任务加入到阻塞队列中,但是此时加入的时候,发现阻塞队列满了(因为本来就没有容量),就会将任务传给拒绝策略。这些都是 execute()
方法里做的事。
看一下 DiscardOldestPolicy
拒绝策略的源码:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { | |
public DiscardOldestPolicy() { } | |
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { | |
if (!e.isShutdown()) { | |
e.getQueue().poll(); // 出队操作,但是这对于 SynchronousQueue 来说毫无意义 | |
e.execute(r); // 调用 execute 方法,如果还没有线程空闲,就可以放入阻塞独队列 | |
} | |
} | |
} |
如果线程池的线程迟迟没有空闲,这两个方法就会无限递归下去,然后爆栈。
# 关闭线程池
遍历线程池中的所有线程,然后逐个调用线程的 interrupt 方法来中断线程。其中涉及到两个方法:
shutdown
方法:将线程池的状态设置为SHUTWDOWN
状态,正在执行的任务会继续执行下去,没有被执行的线程则中断。在队列中的任务会被继续执行。shutdownNow
则是将线程池的状态设置为STOP
,所有线程都被中断,正在执行的任务则被停止,没被执行的任务会形成一个任务列表被返回。
public class Main { | |
public static void main(String[] args) { | |
ExecutorService executorService = Executors.newFixedThreadPool(5); | |
for (int i = 0; i < 5; i++) { | |
final int j = i; | |
executorService.execute(()->f(j)); | |
} | |
//shutdown,会将队列中的任务继续运行,而 shutdownNow 不会 | |
executorService.shutdownNow(); | |
System.out.println(executorService.isShutdown()); | |
System.out.println(executorService.isTerminated()); | |
} | |
public static void f(int i) { | |
System.out.println("第" + i + "个任务执行开始..."); | |
// 输入一个字符串并打印 | |
try { | |
Thread.sleep(10000); | |
} catch (InterruptedException e) { | |
System.out.println("第" + i + "个任务执行被中断..."); | |
return; | |
} | |
System.out.println("第" + i + "个任务执行结束..."); | |
} | |
} |
需要注意的是,中断调用的方法是 Thread.interrupt()
,这种方法的作用有限,如果线程中没有 sleep
、 wait
、 Condition
、定时锁等应用, interrupt()
方法是无法中断当前的线程的。所以, ShutdownNow()
并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
你可以测试一下,把 f(int i)
方法的 Thread.Sleep(1000);
换成 new Scanner(System.in).nextLine()
,那么即使调用了 shutdownNow()
,程序也不会停止。
你可能注意到,
f
接收的参数被我设置成了final
,为啥需要设置为 final,你可以查一下,也可以看看这篇文章
# 自定义拒绝策略
通过查看源码可知, ThreadPoolExecutor
接受的拒绝策略参数是 RejectedExecutionHandler
,所以我们可以自己写一个决绝策略的类,继承 RejectedExecutionHandler
接口:
public interface RejectedExecutionHandler { | |
void rejectedExecution(Runnable r, ThreadPoolExecutor executor); | |
} |
写一个实现类:
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { | |
@Override | |
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { | |
// 阿巴阿巴操作 | |
System.out.println(r.toString() + " is rejected"); | |
} | |
} |
测试一下:
public class Main { | |
public static void main(String[] args) { | |
ExecutorService executorService = new ThreadPoolExecutor(5,5,12, | |
TimeUnit.SECONDS,new SynchronousQueue<>(), | |
new RejectedExecutionHandlerImpl()); | |
for (int i = 0; i < 6; i++) { | |
final int j = i; | |
executorService.execute(()->f(j)); | |
} | |
} | |
public static void f(int i) { | |
System.out.println("第" + i + "个任务执行开始..."); | |
try { | |
Thread.sleep(10000); | |
} catch (InterruptedException e) { | |
System.out.println("第" + i + "个任务执行被中断..."); | |
return; | |
} | |
System.out.println("第" + i + "个任务执行结束..."); | |
} | |
} |
# 源码解析
ThreadPoolExecutor
的重要属性其实就是构造函数传入的那几个参数,其他重要的还有:
// 原子操作类,避免出现并发问题,低 29 位表示线程池中线程数,通过高 3 位表示线程池的运行状态 | |
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); | |
//worker 的集合,用 set 来存放 | |
private final HashSet<Worker> workers = new HashSet<Worker>(); |
看一下线程池运行状态以及转换:
# 任务执行
execute –> addWorker –>runworker (getTask)
线程池的工作线程通过 Woker 类实现,在 ReentrantLock 锁的保证下,把 Woker 实例插入到 HashSet 后,并启动 Woker 中的线程。 从 Woker 类的构造方法实现可以发现:线程工厂在创建线程 thread 时,将 Woker 实例本身 this 作为参数传入,当执行 start 方法启动线程 thread 时,本质是执行了 Worker 的 runWorker 方法。 firstTask 执行完成之后,通过 getTask 方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask 方法会被阻塞并挂起,不会占用 cpu 资源。
public void execute(Runnable command) { | |
if (command == null) | |
throw new NullPointerException(); | |
int c = ctl.get(); | |
if (workerCountOf(c) < corePoolSize) { | |
// 线程池的当前线程数小于 corePoolSize,执行 addWorker 创建新线程执行 command 任务 | |
if (addWorker(command, true)) | |
return; | |
c = ctl.get(); | |
} | |
// double check: c, recheck | |
// 线程池处于 RUNNING 状态,把提交的任务成功放入阻塞队列中 | |
if (isRunning(c) && workQueue.offer(command)) { | |
int recheck = ctl.get(); | |
// 回滚到入队操作前,即倘若线程池 shutdown 状态,就 remove (command) | |
// 如果线程池没有 RUNNING,成功从阻塞队列中删除任务,执行 reject 方法处理任务 | |
if (! isRunning(recheck) && remove(command)) | |
reject(command); | |
// 线程池处于 running 状态,但是没有线程,则创建线程 | |
else if (workerCountOf(recheck) == 0) | |
addWorker(null, false); | |
} | |
// 往线程池中创建新的线程失败,则 reject 任务 | |
else if (!addWorker(command, false)) | |
reject(command); | |
} |
在多线程环境下,线程池的状态时刻在变化,而 ctl.get()
是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将 command
加入 workque
是线程池之前的状态。倘若没有 double check,万一线程池处于非 running
状态 (在多线程环境下很有可能发生),那么 command 永远不会执行。在双重检查下,如果线程池状态发生变化,就可以对这个任务执行拒绝策略。
# 任务关闭
shutdown()
和 shutdownNow()
,上文也讲过了,嘿嘿😘
# 参考
https://www.yuque.com/qingkongxiaguang/javase/bnoif5#ef6ebd71
https://pdai.tech/md/java/thread/java-thread-x-juc-executor-ThreadPoolExecutor.html
https://www.iteye.com/blog/justsee-999189
《Java 并发编程的艺术》第 10 章
深入理解高并发编程(第 1 版)