前言#
当项目中有频繁创建线程的场景时,往往会用到线程池来提高效率。所以,线程池在项目开发过程中的出场率是很高的。
那线程池是怎么工作的呢?它什么时候创建线程对象,如何保证线程安全...
什么时候创建线程对象#
当实例化线程池对象时,并没有预先创建corePoolSize
个线程对象,而是在调用execute
或submit
提交任务时,才会创建线程对象。
工作流程#
public void execute(Runnable command) {
int c = ctl.get();
// 1. 如果核心线程数没有用完,则让核心线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 核心线程用完后,尝试添加到等待队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 等待队列满后,尝试创建临时线程执行任务;
// 如果创建临时线程失败,即达到了最大线程数,则采用拒绝策略处理
else if (!addWorker(command, false))
reject(command);
}
如何保存线程池状态#
// 线程池采用状态压缩的思想,通过 32 个 bit 位来存储线程池状态和工作线程数量
// 其中,高 3 位存储线程池状态,低 29 位存储工作线程数量
// 表示工作线程数量位数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池最大容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 通过线程安全的 atomic 对象来存储线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
如何保证线程安全#
1、对于一些需要线程共享的成员变量,通过volatile
修饰
2、线程池状态和工作线程数的修改通过AtomicInteger
类自带的线程安全方法实现
3、工作线程Worker
通过线程不安全的HashSet
存储,每次操作HashSet
时都通过一个可重入锁ReentrantLock
保证线程安全,ReentrantLock
还用来保证访问一些线程不安全的变量,比如
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
*/
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
*/
private long completedTaskCount;
4、Worker
实现了AQS
类,保证线程安全地操作工作线程Worker
。Worker
通过设置AQS#state
来实现加锁和释放锁,当state == 0
时,表示没有上锁;当state == 1
时,表示上锁,通过CAS
方式实现线程安全
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
内置线程池#
1、SingleThreadPool
单线程池,只使用一个线程执行任务,阻塞队列LinkedBlockingQueue
最大容量为Integer.MAX_VALUE
。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2、FixedThreadPool
固定大小的线程池,当创建的线程数量达到maxPoolSize
,就不再创建线程,阻塞队列LinkedBlockingQueue
最大容量为Integer.MAX_VALUE
。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3、CachedThreadPool
缓存线程池,核心线程数量为0
,所以任务都通过创建临时线程来执行,临时线程空闲回收时间为60
秒,当线程池空闲时,临时线程都会被回收,不耗费资源。
阻塞队列SynchronousQueue
不会缓存任务,也就是说,新任务进来后会直接被调度执行,如果没有可用的线程了,则会创建新的线程,如果线程数达到maxPoolSize
,即Integer.MAX_VALUE
,就执行拒绝策略。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
4、ScheduleThreadPool
定时调度线程池,通过DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
5、WorkStealingThreadPool
工作窃取线程池,当一个处理器忙时,空闲的处理器可以窃取该处理器后续的任务执行。通过ForkJoinPool
实现,可设置支持的并行级别。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}