首页 > temp > 简明python教程 >
-
简单看看ThreadPoolExecutor原理(3)
四.execute方法
首先看下面这个图,ThreadPoolExecutor分为由三部分组成,第一部分是workers,这是一个HashSet<Worker> workers = new HashSet<Worker>(),最开始线程池中线程数量小于核心线程数量的时候,会直接把任务封装成Worker丢到这个集合中来,每一个Worker都实现了Runnable接口,里面还保存了一个Runnable类型的属性和Thread类型的属性,线程池中的创建的Thread实例时传进去的是Worker,所以在线程启动调用start方法的时候,实际上是调用Worker的run方法;
第二部分指的是BlockingQueue<Runnable> workQueue,这是一个阻塞队列,存放的是原始的任务,没有封装成Worker,这个队列只有在线程池中线程数量多于核心线程数量的时候才会启用;
第三部分指的是RejectedExecutionHandler handler,这是一个拒绝策略,在比如队列满了,线程数量也到极限了,这时还有任务在往线程池中丢,此时就可以对这些多余的任务做处理;
顺便再说一下线程池有几种状态:
RUNNING:线程池可以接收新任务并且处理阻塞队列中的任务,任务会直接扔到workers中去,高三位是111;
SHUTDOWN:线程池拒绝新任务但是处理阻塞队列中的任务,这里说明任务已经很多了,阻塞队列已经满了,而且线程数量已经达到最大值了,高三位是000;
STOP:线程池拒绝新任务并且抛弃阻塞队列中的任务,同时中断正在处理的任务,这里也就是说强行停止线程池,高三位是001;
TIDING:所有的任务都执行完毕了,包括阻塞队列中的任务也执行完了,此时线程池中所有线程都处于空闲状态,将要调用terminated方法,高三位是010;
TERMINATED:终止状态,terminated方法调用完成以后的状态 ,高三位是011;
public void execute(Runnable command) { //如果任务是空,就抛出异常 if (command == null) throw new NullPointerException(); //获取原子变量,32位的,分为高3位和低29位,前面已经介绍了 int c = ctl.get(); //[1]workerCountOf方法获取第29位,也就是线程池中线程数量,当数量小于设置的核心数量时,就开启新的线程执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //[2]如果线程池处于RUNNING状态,而且此时线程池中线程的数量已经达到了核心数量,于是就把任务丢到阻塞队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //当前线程池状态不是RUNNING就从队列中删除任务,执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); //如果线程池中没有线程,那就添加一个空闲线程不断的去任务队列中轮询任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //[3]能到这里说明阻塞队列也已经满了,然后尝试着再新增线程去处理任务,直到线程数量到达最大数量 //如果新增线程失败,就执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
最关键的就是addWorker方法,这个方法主要是在线程池中添加一个新的线程;
//这个方法分为两个部分,首先是通过双重循环使用CAS增加线程数,也就是将那个原子变量的低29位加一 //然后就是将实现了Runnable接口的任务添加到一个集合HashSet<Worker> workers中,然后启动任务执行 private boolean addWorker(Runnable firstTask, boolean core) { //goto标志,注意,这个retry一定要在for循环前面,相当于给这个for循环取个名字,在该for循环里面如果 //break retry表示跳出这个for循环;而continue retry表示重新进入到这个for循环 retry: for (;;) { int c = ctl.get(); //线程池的状态,也就是ctl原子变量的高三位 int rs = runStateOf(c); //这里的作用就是线程池创建线程失败的情况; //这里分为三种情况这里会返回false: //第一种是线程池状态是STOP,TIDING,TERMINATED //第二种是线程池状态是SHUTDOWN并且已经有了第一个任务 //第三种是线程池状态是SHUTDOWN而且任务队列为空 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; //CAS增加线程数 for (;;) { //获取线程池数量 int wc = workerCountOf(c); //如果线程池中线程数量大于最大容量或者是线程数大于核心线程数量就返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //线程数量没有超过限制,就CAS使得ctl的低29位的值加一,CAS成功就跳出这两个for循环 if (compareAndIncrementWorkerCount(c)) break retry; //CAS失败的话,就查看线程池状态是否发生变化,变化则跳出两个for循环,然后再重新进入for循环获取线程池状态; //线程池状态没有变化,就在此for循环中继续CAS c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } //假如到了这里,说明上面CAS将原子变量低29位加一已经成功了,下面就该新建线程真正的执行任务了 boolean workerStarted = false; boolean workerAdded =