首页 > Python基础教程 >
-
Java多线程信号量同步类CountDownLatch与Semaphore
信号量同步是指在不同线程之间,通过传递同步信号量来协调线程执行的先后次序。CountDownLatch是基于时间维度的Semaphore则是基于信号维度的。
1:基于执行时间的同步类CountDownLatch
例如现有3台服务器,需编写一个获取各个服务器状态的接口,准备开三个子线程每个线程获取一台服务器状态后统一返回三台服务器状态。主线程内定义计数器为3的CountDownLatch实例,各个子线程添加CountDownLatch实例引用,子线程执行完后对CountDownLatch进行countDown。主线程调用CountDownLatch实例的await方法等待所有子线程执行完后返回结果。不考虑异常情况的代码示例如下。
public class Main { public static void main(String[] args) throws InterruptedException { CountDownLatch count = new CountDownLatch(3); Thread getServer1Status = new GetDataStatusThread("服务器1", count); Thread getServer2Status = new GetDataStatusThread("服务器2", count); Thread getServer3Status = new GetDataStatusThread("服务器3", count); getServer1Status.start(); getServer2Status.start(); getServer3Status.start(); //await 使当前线程等待直至CountDownLatch的计数为0,除非线程中断 //count.await(); //await(long timeout, TimeUnit unit) 使当前线程等待直至CountDownLatch的计数为0,除非线程中断或经过指定的等候时间 count.await(10,TimeUnit.SECONDS); System.out.println("所有服务器状态获取完成"); } } class GetDataStatusThread extends Thread { private final CountDownLatch count; public GetDataStatusThread(String threadName, CountDownLatch count) { this.setName(threadName); this.count = count; } @Override public void run() { System.out.println("获取" + this.getName() + "状态成功"); //递减CountDownLatch的计数,如果计数达到零,则释放所有等待的线程 count.countDown(); } }
注意:CountDownLatch的await方法建议使用带超时间的。不使用带超时时间await的线程若计数器初始值设置的值达不到countDown(递减计数器计数)次数则该线程会一直等待至计数为0,除非线程中断
(例如子线程执行过程中出现异常执行不到countDown方法,顺便补充若子线程会抛出异常且该异常没有被主线程捕获到可通过线程方法setUncaughtExceptionHandler()捕获)。
2:基于空闲信号的同步类Semaphore
Semaphore可看作一个管理“许可证”的池,创建Semaphore实例时指定许可证数量,所有包含Semaphore实例引用的线程运行时通过acquire方法获取许可证,运行完成后通过release方法释放许可证。获取不到许可证等线程直到获取到空闲许可证才会执行。如下代码所示:某景区只有两个买票窗口(许可池大小)所有游客排队进行买票,准备买票的游客通过acquire占据当前窗口买票完成准备离开通过release方法表示当前窗口已空闲。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public class Window { public static void main(String[] args) { Semaphore semaphore = new Semaphore( 2 ); for ( int i = 1 ; i <= 6 ; i++) { new ByTicketThread( "客户" + i, semaphore).start(); } } } class ByTicketThread extends Thread { private final Semaphore semaphore; public ByTicketThread(String threadName, Semaphore semaphore) { this .setName(threadName); this .semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println( this .getName() + "正在买票。" ); Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); System.out.println( this .getName() + "买票完成。" ); } } } |