-
java并发---线程通信和线程池原理
一、线程的状态
线程的状态包括新建(初始状态)、就绪、运行、死亡(终止)、阻塞;
(1)简化版本
(2)结合java线程方法版本
(2)线程通信
(2.1)传统的线程通信
-
wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()方法或notifyAll()方法来唤醒该线程。该wait()方法有3种形式——无时间参数的wait(一直等待,直到其他线程通知),带毫秒参数的wait和带毫秒、毫微秒参数的wait(这两种方法都是等待指定时间后自动苏醒)。调用wait()方法的当前线程会释放对该同步监视器的锁定。
-
notify():唤醒在此同步监视器上等待的单个线程。如果所有线程都在此同步监视器上等待,则会选择唤醒其中一个线程。选择是任意性的。只有当前线程放弃对该同步监视器的锁定后(使用wait()方法),才可以执行被唤醒的线程。
-
notifyAll():唤醒在此同步监视器上等待的所有线程。只有当前线程放弃对该同步监视器的锁定后,才可以执行被唤醒的线程。
eg:
package test; public class ThreadComm { public static boolean WASHED = false; public static void wash(int i) { System.out.println(i + "已经洗手"); WASHED = true; } public static void eat(int i) { System.out.println(i + "已经吃饭"); WASHED = false; } public static void main(String[] args) { // wash线程 for (int i = 0; i <= 5; i++) { int j = i; new Thread(new Runnable() { @Override public void run() { doWash(j); doEat(j); } private synchronized void doWash(int i) { if (!WASHED) {// 如果还没洗手,就执行洗手操作,否则,阻塞当前线程,直到吃饭完成 ThreadComm.wash(i); notifyAll(); } else { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } }// doWash private synchronized void doEat(int i) { if (WASHED) {// 已经洗完手,唤起当前吃饭线程 ThreadComm.eat(i); notifyAll(); } else { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } }// doEat }).start(); }// for } }
(2.2)使用condition控制线程通信
如果程序不使用synchronized关键字来保证同步,而是直接使用Lock对象来保证同步,则系统中不存在隐式的同步监视器,也就不能使用wait()、notify()、notifyAll()方法进行线程通信了。当使用Lock对象来保证同步时,Java提供了一个Condition类来保持协调,使用Condition可以让那些已经得到Lock对象却无法继续执行的线程释放Lock对象,Condition对象也可以唤醒其他处于等待的线程。
Condition实例被绑定在一个Lock对象上。要获得特定Lock实例的Condition实例,调用Lock对象的newCondition()方法即可。Condition类提供了如下3个方法:
await():类似于隐式同步监视器上的wait()方法,导致当前线程等待,直到其他线程调用该Condition的signal()方法或signalAll()方法来唤醒该线程。该await()方法有更多变体,如longawaitNanos(long nanosTimeout)、void awaitUninterruptibly()、awaitUntil(Datedeadline)等,可以完成更丰富的等待操作。
signal():唤醒在此Lock对象上等待的单个线程。如果所有线程都在该Lock对象上等待,则会选择唤醒其中一个线程。选择是任意性的。只有当前线程放弃对该Lock对象的锁定后(使用await()方法),才可以执行被唤醒的线程。
signalAll():唤醒在此Lock对象上等待的所有线程。只有当前线程放弃对该Lock对象的锁定后,才可以执行被唤醒的线程。
eg:
package test; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ConditionTread { private static ReentrantLock lock = new ReentrantLock(); private static Condition cond = lock.newCondition(); public static boolean WASHED = false; public static void wash(int i) { WASHED = true; System.out.println(i + "已经洗手"); } public static void eat(int i) { WASHED = false; System.out.println(i + "已经吃饭"); } public static void main(String[] args) { // wash线程 for (int i = 0; i <= 5; i++) { int j = i; new Thread(new Runnable() { @Override public void run() { doWash(j); doEat(j); } private void doWash(int i) { lock.lock(); try { if (!WASHED) {// 如果还没洗手,就执行洗手操作,否则,阻塞当前线程,直到吃饭完成 ConditionTread.wash(i); cond.signalAll();//唤醒其他线程 } else { try { cond.await(); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }// doWash private void doEat(int i) { lock.lock(); try { if (WASHED) {// 已经洗完手,唤起当前吃饭线程 ConditionTread.eat(i); cond.signalAll(); } else { try { cond.await(); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }// doEat }).start(); }// for } }
(2.3)使用阻塞队列(BlockingQueue)控制线程通信
Java 5提供了一个BlockingQueue接口(Queue的子接口),主要用途是作为线程同步的工具。BlockingQueue具有一个特征:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。程序的两个线程通过交替向BlockingQueue中放入元素、取出元素,即可很好地控制线程的通信。BlockingQueue提供两个支持阻塞的方法:
put(E e):尝试把E元素放入BlockingQueue中,如果该队列的元素已满,则阻塞该线程。
take():尝试从BlockingQueue的头部取出元素,如果该队列的元素已空,则阻塞该线程。
BlockingQueue继承了Queue接口,也可使用Queue接口中的方法:
在队列尾部插入元素。包括add(E e)、offer(E e)和put(E e)方法,当该队列已满时,这3个方法分别会抛出异常、返回false、阻塞队列。
在队列头部删除并返回删除的元素。包括remove()、poll()和take()方法。当该队列已空时,这3个方法分别会抛出异常、返回false、阻塞队列。
在队列头部取出但不删除元素。包括element()和peek()方法,当队列已空时,这两个方法分别抛出异常、返回false。
eg:
package test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ConditionTread { private static BlockingQueue<String> bq = new ArrayBlockingQueue<>(10); public static void wash(int i) { System.out.println(i + "已经洗手"); } public static void eat(int i) { System.out.println(i + "已经吃饭"); } public static void main(String[] args) { // wash线程 for (int i = 0; i <= 5; i++) { int j = i; new Thread(new Runnable() { @Override public void run() { doWash(j); doEat(j); } private void doWash(int i) { // 如果还没洗手,就执行洗手操作,否则,阻塞当前线程,直到吃饭完成 ThreadComm.wash(i); try { bq.put(i + ""); } catch (InterruptedException e) { e.printStackTrace(); } }// doWash private void doEat(int i) { try { if (bq.take()!=null) {// 已经洗完手,唤起当前吃饭线程 ThreadComm.eat(i); try { bq.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (InterruptedException e) { e.printStackTrace(); } }// doEat }).start(); }// for } }
二、线程池
(1)常用线程池的类结构
普通线程执行完,就会进入TERMINATED销毁掉,而线程池就是创建一个缓冲池存放线程,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等候下次任务来临,这使得线程池比手动创建线程有着更多的优势:
- 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
- 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
- 方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM;
- 节省cpu切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场)。
- 提供更强大的功能,延时定时线程池。(eg:ScheduledThreadPoolExecutor可以代替Timer执行定时任务)
(2)线程池的工作状态
- RUNNING:初始化状态是RUNNING。线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。RUNNING状态下,能够接收新任务,以及对已添加的任务进行处理。
- SHUTDOWN:SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
- STOP:不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由(RUNNING 或 SHUTDOWN ) -> STOP。
注意:运行中的任务还会打印,直到结束,因为调的是Thread.interrupt
- TIDYING:所有的任务已终止,队列中的”任务数量”为0,线程池会变为TIDYING。线程池变为TIDYING状态时,会执行钩子函数terminated(),可以通过重载terminated()函数来实现自定义行为。
- TERMINATED:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED
(3)线程池原理
- 添加任务,如果线程池中线程数没达到coreSize,直接创建新线程执行
- 达到core,放入queue
- queue已满,未达到maxSize继续创建线程
- 达到maxSize,根据reject策略处理
- 超时后,线程被释放,下降到coreSize
(4)线程池源码分析
1)线程池是如何保证线程不被销毁的呢?
如果队列中没有任务时,核心线程会一直阻塞在获取任务的方法,直到返回任务。而任务执行完后,又会进 下一轮 work.runWork()中循环
验证:秘密就藏在核心源码里 ThreadPoolExecutor.getTask()
//work.runWork(): while (task != null || (task = getTask()) != null) //work.getTask(): boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
2)那么线程池中的线程会处于什么状态?
答案:RUNNABLE,WAITING
验证:起一个线程池,放置一个任务sleep,debug查看结束前后的状态
//debug add watcher: ((ThreadPoolExecutor) poolExecutor).workers.iterator().next().thread.getState()
ThreadPoolExecutor poolExecutor = Executors.newFixedThreadPool(5); poolExecutor.execute(new Runnable() { public void run() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("ok");
3)核心线程与非核心线程有区别吗?
答案:没有。被销毁的线程和创建的先后无关。即便是第一个被创建的核心线程,仍然有可能被销毁
验证:看源码,每个works在runWork的时候去getTask,在getTask内部,并没有针对性的区分当前work是否是核 心线程或者类似的标记。只要判断works数量超出core,就会调用poll(),否则take()
(5)线程池调优
1)Executors剖析
1.1)newCachedThreadPool
//core=0 //max=Integer //timeout=60s //queue=1 //也就是只要线程不够用,就一直开,不用就全部释放。线程数0‐max之间弹性伸缩 //注意:任务并发太高且耗时较长时,造成cpu高消耗,同时要警惕OOM return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
1.2)newFixedThreadPool
//core=max=指定数量 //timeout=0 //queue=无界链表 //也就是说,线程数一直保持制定数量,不增不减,永不超时 //如果不够用,就沿着队列一直追加上去,排队等候 //注意:并发太高时,容易造成长时间等待无响应,如果任务临时变量数据过多,容易OOM return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
1.3)newSingleThreadExecutor
//core=max=1 //timeout=0 //queue=无界链表 //只有一个线程在慢吞吞的干活,可以认为是fix的特例 //适用于任务零散提交,不紧急的情况 new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
1.4)newScheduledThreadPool
//core=制定数 //max=Integer //timeout=0 //queue=DelayedWorkQueue(重点!) //用于任务调度,DelayedWorkQueue限制住了任务可被获取的时机(getTask方法),也就实现了时间控制 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
2)优化建议
2.1)corePoolSize
基本线程数,一旦有任务进来,在core范围内会立刻创建线程进入工作。所以这个值应该参考业务并发量在绝大多数时间内的并发情况。同时分析任务的特性。
高并发,执行时间短的,要尽可能小的线程数,如配置CPU个数+1,减少线程上下文的切换。因为它不怎么占时 间,让少量线程快跑干活。
并发不高、任务执行时间长的要分开看:如果时间都花在了IO上,那就调大CPU,如配置两倍CPU个数+1。不能 让CPU闲下来,线程多了并行处理更快。如果时间都花在了运算上,运算的任务还很重,本身就很占cpu,那尽量 减少cpu,减少切换时间。参考第一条。
如果高并发,执行时间还很长……
2.2)workQueue
任务队列,用于传输和保存等待执行任务的阻塞队列。这个需要根据你的业务可接受的等待时间。是一个需要权衡 时间还是空间的地方,如果你的机器cpu资源紧张,jvm内存够大,同时任务又不是那么紧迫,减少coresize,加大 这里。如果你的cpu不是问题,对内存比较敏感比较害怕内存溢出,同时任务又要求快点响应。那么减少这里。
2.3)maximumPoolSize
线程池最大数量,这个值和队列要搭配使用,如果你采用了无界队列,这个参数失效。同时要注意,队列盛满,同 时达到max的时候,再来的任务可能会丢失(下面的handler会讲)。 如果你的任务波动较大,同时对任务波峰来的时候,实时性要求比较高。也就是来的很突然并且都是着急的。那么 调小队列,加大这里。如果你的任务不那么着急,可以慢慢做,那就扔队列吧。 队列与max是一个权衡。队列空间换时间,多花内存少占cpu,轻视任务紧迫度。max舍得cpu线程开销,少占内存,给任务最快的响应。
2.4)keepaliveTime
线程存活保持时间,超出该时间后,线程会从max下降到core,很明显,这个决定了你养闲人所花的代价。如果 你不缺cpu,同时任务来的时间没法琢磨,波峰波谷的间隔比较短。经常性的来一波。那么实当的延长销毁时间, 避免频繁创建和销毁线程带来的开销。如果你的任务波峰出现后,很长一段时间不再出现,间隔比较久,那么要适当调小该值,让闲着不干活的线程尽快销毁,不要占据资源。
2.5)threadFactory(自定义展示实例)
线程工厂,用于创建新线程。threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)。如果需要自己定义线程 的某些属性,如个性化的线程名,可以在这里动手。一般不需要折腾它。
2.6)handler
线程饱和策略,当线程池和队列都满了,再加入线程会执行此策略。默认不处理的话会扔出异常,打进日志。这个与任务处理的数据重要程度有关。如果数据是可丢弃的,那不需要额外处理。如果数据极其重要,那需要在这里采取措施防止数据丢失,如扔消息队列或者至少详细打入日志文件可追踪。
优化总结:
1)线程池的线程数量设置不宜过大,因为一旦线程池的工作线程总数超过系统所拥有的处理器数量,就会导致过多的上下文切换。
2)慎用Executors,尤其如newCachedThreadPool。这个方法如果任务过多会无休止创建过多线 程,增加了上下文的切换。最好根据业务情况,自己创建线程池参数。
(6)开启线程方法总结
package test; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; public class Test { public static void main(String[] args) { // 继承Thread ThreadTest th1 = new ThreadTest(); th1.setName("thread"); th1.start(); // 实现Runnable RunnableTest runnable = new RunnableTest(); Thread th2 = new Thread(runnable); th2.setName("runnable"); th2.start(); // 实现Callable<> 接口,java5新增,可返回执行结果 CallableTest callable = new CallableTest(); FutureTask<Integer> future = new FutureTask<>(callable); new Thread(future, "callable").start(); try { Integer r = future.get(); System.out.println(r); } catch (Exception e) { e.printStackTrace(); } // 线程池 ExecutorService pool = Executors.newFixedThreadPool(10); ThreadPoolExecutor executor = (ThreadPoolExecutor) pool; executor.execute(new PoolHandler()); } } // 方式一 class ThreadTest extends Thread { @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + ":" + i); } } } // 方式二 class RunnableTest implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + ":" + i); } } } // 方式三 class CallableTest implements Callable<Integer> { @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + ":" + i); sum += i; } return sum; } } /** * 方式四 线程池实现方式 * 注意:使用线程池时,使用实现Runnable的方式可避免java中单一继承造成的局限性 */ class PoolHandler implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + ":" + i); } } }
查阅和参考了不少资料,感谢各路大佬分享,如需转载请注明出处,谢谢:https://www.cnblogs.com/huyangshu-fs/p/11374573.html