重点内容
- 线程池的使⽤
- 创建线程池
- 提交任务
- 关闭线程池
- 线程池的原理
- 合理配置线程池
- 线程池的监控
1.线程池的创建#
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- corePoolSize:线程池的基本大小。 提前调用prestartAllCoreThreads(),会把所有的基本线程启动 。
- workQueue: ⽤于保存等待执⾏的任务的阻塞队列。
- ArrayBlockingQueue 基于数组实现的(先进先出)。
- LinkedBlockingQueue 吞吐量要高于ArrayBlockingQueue。
- SynchronousQueue 吞吐量要高于LinkedBlockingQueue 不存储元素的阻塞队列,得等一个线程做移除操作才能继续进行,要不会一直阻塞。
- PriorityBlockingQueue 具有优先级的无限阻塞队列。
- maximumPoolSize: 线程池允许创建的最⼤线程数。
- threadFactory: ⽤于设置创建线程的工厂可以使用谷歌的开源方法。
- handler: 饱和策略,阻塞队列和我们的线程的创建数都满了的时候就会饱和选择一个策略对新提交的策略进行处理。
- AbortPolicy 直接抛出异常。
- CallerRunsPolicy 只用调用者所在的线程来处理任务。
- DiscardOldestPolicy 丢弃队列里最近的一个任务。
- DiscardPolicy 直接丢弃。
- ⾃定义 自己定义一个处理方式。
- keepAliveTime:线程池的⼯作线程空闲后,保持存活的时间。
- unit:线程活动保持时间的单位。
2.提交任务#
execute:⽤于提交不需要返回值的任务
submit:⽤于提交需要返回值的任务
shutdown:终止的时候会抛出异常
shutdownNow:中止的时候不会抛出异常
- 线程池测试代码
/** @Classname ThreadPoolDemo @Author XW @Date 2021/12/17 23:15 */
public class ThreadPoolDemo {
private static ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
private static ExecutorService pool =
new ThreadPoolExecutor(
2,
20,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(2),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
pool.execute(new NoResultThread(i));
/**
* submit 验证 Future<String> future = pool.submit(new ResultThread()); try {
* System.out.println("main thread get result: " + future.get()); // future.get(100,
* TimeUnit.MICROSECONDS); } catch (Exception e) { e.printStackTrace(); }
*/
}
// shutdown 验证
System.out.println("执行shutdown! ");
pool.shutdown(); // 会继续执行并且完成所有未执行的任务, 新提交的任务会被reject(通过reject策略)
for (int i = 10; i < 12; i++) {
pool.execute(new NoResultThread(i));
}
/**
* shutdownnow 验证 System.out.println("执行shutdownnow! "); List<Runnable> runnableList =
* pool.shutdownNow(); // 会清除所有未执行的任务并且在运行线程上调用interrupt()
*/
System.out.println("pool shutdown state: " + pool.isShutdown());
while (true) {
if (pool.isTerminated()) {
System.out.println("pool terminated!");
break;
} else {
System.out.println("pool terminated state: " + pool.isTerminated());
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class NoResultThread implements Runnable {
private int taskNum;
public NoResultThread(int taskNum) {
this.taskNum = taskNum;
}
@Override
public void run() {
System.out.println("线程 " + Thread.currentThread().getName() + " 开始执行任务 " + this.taskNum);
try {
Thread.sleep(1000);
System.out.println("线程 " + Thread.currentThread().getName() + " 执行完任务 " + this.taskNum);
} catch (InterruptedException e) {
System.out.println(
"线程 "
+ Thread.currentThread().getName()
+ " 在执行任务 "
+ this.taskNum
+ " 时被中断 :"
+ e.getMessage());
}
}
}
private static class ResultThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println(
Thread.currentThread().getState() + "----------" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getName();
}
}
}
3.线程池的实现原理#
首先会判断corePoolSize核心线程池是否已经满了,没满就直接创建线程执行任务,满了再去判断队列是否满了,队列没有满的话在把任务放在队列里面,队列如果满的话,会将当前的线程数量跟maximumPoolSize进行对比如果没满的话就创建线程执行任务,maximumPoolSize也满了话就按照策略(handler)处理无法执行的任务。注意线程池只要创建线程就会获取全局锁。
线程会根据worker去线程池里面拿任务
- 线程池execute的源码
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
/** ctl记录着workCount和runState */
int c = ctl.get();
/** 第一步: 如果线程池中的线程数量小于核心线程数,那么创建线程并执行*/
if (workerCountOf(c) < corePoolSize) { // workerCountOf(c): 获取当前活动的线程数
/**
* 在线程池中新建一个新的线程
* command:需要执行的Runnable线程
* true:新增线程时,【当前活动的线程数】是否 < corePoolSize
* false:新增线程时,【当前活动的线程数】是否 < maximumPoolSize
*/
if (addWorker(command, true)) {
// 添加新线程成功,则直接返回。
return;
}
// 添加新线程失败,则重新获取【当前活动的线程数】
c = ctl.get();
}
/**
* 第二步:如果当前线程池是运行状态 并且 任务添加到队列成功
* (即:case2: 如果workCount >= corePoolSize,创建线程往workQueue添加线程任务,等待执行)
*/
// BlockingQueue<Runnable> workQueue 和 Runnable command
if (isRunning(c) && workQueue.offer(command)) { // 添加command到workQueue队列中。
// 重新获取ctl
int recheck = ctl.get();
// 再次check一下,当前线程池是否是运行状态,如果不是运行时状态,则把刚刚添加到workQueue中的command移除掉,并调用拒绝策略
if (!isRunning(recheck) && remove(command)) {
reject(command);
} else if (workerCountOf(recheck) == 0) { // 如果【当前活动的线程数】为0,则执行addWork方法
/**
* null:只创建线程,但不去启动
* false:添加线程时,根据maximumPoolSize来判断
*
* 如果 workerCountOf(recheck) > 0, 则直接返回,在队列中的command稍后会出队列并且执行
*/
addWorker(null, false);
}
}
/**
* 第三步:满足以下两种条件之一,进入第三步判断语句
* case1: 线程池不是正在运行状态,即:isRunning(c)==false
* case2: workCount >= corePoolSize 并且 添加workQueue队列失败。即:workQueue.offer(command)==false
*
* 由于第二个参数传的是false,所以如果workCount < maximumPoolSize,则创建执行线程;否则,进入方法体执行reject(command)
*/
else if (!addWorker(command, false)) {
// 执行线程创建失败的拒绝策略
reject(command);
}
}
- 线程池addWorker的源码
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
/** 步骤一:试图将workerCount+1 */
for (; ; ) {
int c = ctl.get();
// 获得运行状态runState
int rs = runStateOf(c);
/**
* 只有如下两种情况可以新增worker,继续执行下去:
* case one: rs == RUNNING
* case two: rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()
*/
if (rs >= SHUTDOWN && // 即:非RUNNING状态(请查看isRunning()方法)。线程池异常,表示不再去接收新的线程任务了,返回false
/**
* 当线程池是SHUTDOWN状态时,表示不再接收新的任务了,所以:
* case1:如果firstTask!=null,表示要添加新任务,则:新增worker失败,返回false。
* case2:如果firstTask==null并且workQueue为空,表示队列中的任务已经处理完毕,不需要添加新任务了。
* 则:新增worker失败,返回false
*/
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
return false;
}
for (; ; ) {
// 获得当前线程池里的线程数
int wc = workerCountOf(c);
/**
* 满足如下任意情况,则新增worker失败,返回false
* case1:大于等于最大线程容量,即:int CAPACITY = 00011111111111111111111111111111 = 536870911(十进制)
* case2:当core是true时:>= 核心线程数
* 当core是false时:>= 最大线程数
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
// 当前工作线程数加1
if (compareAndIncrementWorkerCount(c)) {
break retry; // 成功加1,则跳出retry标识的这两层for循环
}
// 如果线程数加1操作失败,则获取当前最新的线程池运行状态
c = ctl.get();
// 判断线程池运行状态(rs)是否改变;如果不同,则说明方法处理期间线程池运行状态发生了变化,重新获取最新runState
if (runStateOf(c) != rs) {
continue retry; // 跳出内层for循环,继续从第一个for循环执行
}
}
}
/**
* 步骤二:workerCount成功+1后,创建Worker,加入集合workers中,并启动Worker线程
*/
boolean workerStarted = false; /** 用于判断新的worker实例是否已经开始执行Thread.start() */
boolean workerAdded = false; /** 用于判断新的worker实例是否已经被添加到线程池的workers队列中 */
Worker w = null; // AQS.Worker
try {
w = new Worker(firstTask); /** 创建Worker实例,每个Worker对象都会针对入参firstTask来创建一个线程。 */
final Thread t = w.thread; /** 从Worker中获得新建的线程t */
if (t != null) {
final ReentrantLock mainLock = this.mainLock; /** 加重入锁 */
/** ----------lock() 尝试加锁操作!!获得锁后继续执行,没获得则等待直到获得锁为止---------- */
mainLock.lock();
try {
int rs = runStateOf(ctl.get()); /** 获得线程池当前的运行状态runStatus */
/**
* 满足如下任意条件,即可向线程池中添加线程:
* case1:线程池状态为RUNNING。(请查看isRunning()方法)
* case2:线程池状态为SHUTDOWN并且firstTask为null。
*/
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) { /** 因为t是新构建的线程,还没有启动。所以,如果是alive状态,说明已经被启动了,则抛出异常 */
throw new IllegalThreadStateException();
}
workers.add(w); /** workers中保存线程池中存在的所有work实例集合 */
int s = workers.size();
if (s > largestPoolSize) { /** largestPoolSize用于记录线程池中曾经存在的最大的线程数量 */
largestPoolSize = s;
}
workerAdded = true;
}
} finally {
mainLock.unlock(); /** ----------unlock 解锁操作!!---------- */
}
if (workerAdded) {
t.start(); /** 开启线程,执行Worker.run() */
workerStarted = true;
}
}
} finally {
if (!workerStarted) { // 如果没有开启线程
addWorkerFailed(w); // 往线程池中添加worker失败了
}
}
return workerStarted;
}
- 线程池runWorker的源码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
/**
* 如果线程池正在停止,请确保线程被中断;否则,请确保线程不被中断。
* 这需要在第二种情况下重新检查以处理shutdownNow竞赛,同时清除中断
*
* 同时满足如下两个条件,则执行wt.interrupt()
* 1>线程状态为STOP、TIDYING、TERMINATED 或者 (当前线程被中断(清除中断标记)并且线程状态为STOP、TIDYING、TERMINATED)
* 2>当前线程wt没有被标记中断
*/
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted()) {
wt.interrupt();
}
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); /** 真正做事儿的地方了 */
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
4.合理配置线程池#
任务的性质
- CPU密集型 : N cpu + 1 配置尽可能小的线程,线程数要少一点,减少cpu频繁的上下文切换,提高cpu的利用率
- IO 密集型 :2 * N cpu 需要配置尽可能多的线程,这样才能保证cpu能被充分的利用
- 混合型 :拆分成CPU密集型和IO密集型
- N = Runtime.getRuntime().availableProcessors()
任务的优先级 :PriorityBlockingQueue
任务的执⾏时间
- 不同规模的线程池
- PriorityBlockingQueue 让执行时间比较短的线程先执行
任务的依赖性
- 增加线程数量
- 使⽤有界队列保证系统的稳定性
5.线程池的监控#
taskCount 任务的数量
completedTaskCount 运行的过程中完成的任务数量
largestPoolSize 曾经创建过的最大的线程数量
getPoolSize 线程数量
getActiveCount 获取活动的线程数
扩展线程池:beforeExecute、afterExecute 在线程执行前,执行后做点什么