在这篇文章里,我们关注线程同步的话题。这是比多线程更复杂,稍不留意,我们就会“掉到坑里”,而且和单线程程序不同,多线程的错误是否每次都出现,也是不固定的,这给调试也带来了很大的挑战。
在这篇文章里,我们首先阐述什么是同步,不同步有什么问题,然后讨论可以采取哪些措施控制同步,接下来我们会仿照回顾网络通信时那样,构建一个服务器端的“线程池”,JDK为我们提供了一个很大的concurrent工具包,最后我们会对里面的内容进行探索。
为什么要线程同步?
说到线程同步,大部分情况下, 我们是在针对“单对象多线程”的情况进行讨论,一般会将其分成两部分,一部分是关于“共享变量”,一部分关于“执行步骤”。
共享变量
当我们在线程对象(Runnable)中定义了全局变量,run方法会修改该变量时,如果有多个线程同时使用该线程对象,那么就会造成全局变量的值被同时修改,造成错误。我们来看下面的代码:
1 class MyRunner implements Runnable 2 { 3 public int sum = 0; 4 5 public void run() 6 { 7 System.out.println(Thread.currentThread().getName() + " Start."); 8 for (int i = 1; i <= 100; i++) 9 { 10 sum += i; 11 } 12 try { 13 Thread.sleep(500); 14 } catch (InterruptedException e) { 15 e.printStackTrace(); 16 } 17 System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum); 18 System.out.println(Thread.currentThread().getName() + " End."); 19 } 20 } 21 22 23 private static void sharedVaribleTest() throws InterruptedException 24 { 25 MyRunner runner = new MyRunner(); 26 Thread thread1 = new Thread(runner); 27 Thread thread2 = new Thread(runner); 28 thread1.setDaemon(true); 29 thread2.setDaemon(true); 30 thread1.start(); 31 thread2.start(); 32 thread1.join(); 33 thread2.join(); 34 }
这个示例中,线程用来计算1到100的和是多少,我们知道正确结果是5050(好像是高斯小时候玩过这个?),但是上述程序返回的结果是10100,原因是两个线程同时对sum进行操作。
执行步骤
我们在多个线程运行时,可能需要某些操作合在一起作为“原子操作”,即在这些操作可以看做是“单线程”的,例如我们可能希望输出结果的样子是这样的:
1 线程1:步骤1 2 线程1:步骤2 3 线程1:步骤3 4 线程2:步骤1 5 线程2:步骤2 6 线程2:步骤3
如果同步控制不好,出来的样子可能是这样的:
线程1:步骤1
线程2:步骤1
线程1:步骤2
线程2:步骤2
线程1:步骤3
线程2:步骤3
这里我们也给出一个示例代码:
1 class MyNonSyncRunner implements Runnable 2 { 3 public void run() { 4 System.out.println(Thread.currentThread().getName() + " Start."); 5 for(int i = 1; i <= 5; i++) 6 { 7 System.out.println(Thread.currentThread().getName() + " Running step " + i); 8 try 9 { 10 Thread.sleep(50); 11 } 12 catch(InterruptedException ex) 13 { 14 ex.printStackTrace(); 15 } 16 } 17 System.out.println(Thread.currentThread().getName() + " End."); 18 } 19 } 20 21 22 private static void syncTest() throws InterruptedException 23 { 24 MyNonSyncRunner runner = new MyNonSyncRunner(); 25 Thread thread1 = new Thread(runner); 26 Thread thread2 = new Thread(runner); 27 thread1.setDaemon(true); 28 thread2.setDaemon(true); 29 thread1.start(); 30 thread2.start(); 31 thread1.join(); 32 thread2.join(); 33 }
如何控制线程同步
既然线程同步有上述问题,那么我们应该如何去解决呢?针对不同原因造成的同步问题,我们可以采取不同的策略。
控制共享变量
我们可以采取3种方式来控制共享变量。
将“单对象多线程”修改成“多对象多线程”
上文提及,同步问题一般发生在“单对象多线程”的场景中,那么最简单的处理方式就是将运行模型修改成“多对象多线程”的样子,针对上面示例中的同步问题,修改后的代码如下:
1 private static void sharedVaribleTest2() throws InterruptedException 2 { 3 Thread thread1 = new Thread(new MyRunner()); 4 Thread thread2 = new Thread(new MyRunner()); 5 thread1.setDaemon(true); 6 thread2.setDaemon(true); 7 thread1.start(); 8 thread2.start(); 9 thread1.join(); 10 thread2.join(); 11 }
我们可以看到,上述代码中两个线程使用了两个不同的Runnable实例,它们在运行过程中,就不会去访问同一个全局变量。
将“全局变量”降级为“局部变量”
既然是共享变量造成的问题,那么我们可以将共享变量改为“不共享”,即将其修改为局部变量。这样也可以解决问题,同样针对上面的示例,这种解决方式的代码如下:
1 class MyRunner2 implements Runnable 2 { 3 public void run() 4 { 5 System.out.println(Thread.currentThread().getName() + " Start."); 6 int sum = 0; 7 for (int i = 1; i <= 100; i++) 8 { 9 sum += i; 10 } 11 try { 12 Thread.sleep(500); 13 } catch (InterruptedException e) { 14 e.printStackTrace(); 15 } 16 System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum); 17 System.out.println(Thread.currentThread().getName() + " End."); 18 } 19 } 20 21 22 private static void sharedVaribleTest3() throws InterruptedException 23 { 24 MyRunner2 runner = new MyRunner2(); 25 Thread thread1 = new Thread(runner); 26 Thread thread2 = new Thread(runner); 27 thread1.setDaemon(true); 28 thread2.setDaemon(true); 29 thread1.start(); 30 thread2.start(); 31 thread1.join(); 32 thread2.join(); 33 }
我们可以看出,sum变量已经由全局变量变为run方法内部的局部变量了。
使用ThreadLocal机制
ThreadLocal是JDK引入的一种机制,它用于解决线程间共享变量,使用ThreadLocal声明的变量,即使在线程中属于全局变量,针对每个线程来讲,这个变量也是独立的。
我们可以用这种方式来改造上面的代码,如下所示:
1 class MyRunner3 implements Runnable 2 { 3 public ThreadLocal<Integer> tl = new ThreadLocal<Integer>(); 4 5 public void run() 6 { 7 System.out.println(Thread.currentThread().getName() + " Start."); 8 for (int i = 0; i <= 100; i++) 9 { 10 if (tl.get() == null) 11 { 12 tl.set(new Integer(0)); 13 } 14 int sum = ((Integer)tl.get()).intValue(); 15 sum+= i; 16 tl.set(new Integer(sum)); 17 try { 18 Thread.sleep(10); 19 } catch (InterruptedException e) { 20 e.printStackTrace(); 21 } 22 } 23 24 System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + ((Integer)tl.get()).intValue()); 25 System.out.println(Thread.currentThread().getName() + " End."); 26 } 27 } 28 29 30 private static void sharedVaribleTest4() throws InterruptedException 31 { 32 MyRunner3 runner = new MyRunner3(); 33 Thread thread1 = new Thread(runner); 34 Thread thread2 = new Thread(runner); 35 thread1.setDaemon(true); 36 thread2.setDaemon(true); 37 thread1.start(); 38 thread2.start(); 39 thread1.join(); 40 thread2.join(); 41 }
综上三种方案,第一种方案会降低多线程执行的效率,因此,我们推荐使用第二种或者第三种方案。
控制执行步骤
说到执行步骤,我们可以使用synchronized关键字来解决它。
1 class MySyncRunner implements Runnable 2 { 3 public void run() { 4 synchronized(this) 5 { 6 System.out.println(Thread.currentThread().getName() + " Start."); 7 for(int i = 1; i <= 5; i++) 8 { 9 System.out.println(Thread.currentThread().getName() + " Running step " + i); 10 try 11 { 12 Thread.sleep(50); 13 } 14 catch(InterruptedException ex) 15 { 16 ex.printStackTrace(); 17 } 18 } 19 System.out.println(Thread.currentThread().getName() + " End."); 20 } 21 } 22 } 23 24 25 private static void syncTest2() throws InterruptedException 26 { 27 MySyncRunner runner = new MySyncRunner(); 28 Thread thread1 = new Thread(runner); 29 Thread thread2 = new Thread(runner); 30 thread1.setDaemon(true); 31 thread2.setDaemon(true); 32 thread1.start(); 33 thread2.start(); 34 thread1.join(); 35 thread2.join(); 36 }
在线程同步的话题上,synchronized是一个非常重要的关键字。它的原理和数据库中事务锁的原理类似。我们在使用过程中,应该尽量缩减synchronized覆盖的范围,原因有二:1)被它覆盖的范围是串行的,效率低;2)容易产生死锁。我们来看下面的示例:
1 private static void syncTest3() throws InterruptedException 2 { 3 final List<Integer> list = new ArrayList<Integer>(); 4 5 Thread thread1 = new Thread() 6 { 7 public void run() 8 { 9 System.out.println(Thread.currentThread().getName() + " Start."); 10 Random r = new Random(100); 11 synchronized(list) 12 { 13 for (int i = 0; i < 5; i++) 14 { 15 list.add(new Integer(r.nextInt())); 16 } 17 System.out.println("The size of list is " + list.size()); 18 } 19 try 20 { 21 Thread.sleep(500); 22 } 23 catch(InterruptedException ex) 24 { 25 ex.printStackTrace(); 26 } 27 System.out.println(Thread.currentThread().getName() + " End."); 28 } 29 }; 30 31 Thread thread2 = new Thread() 32 { 33 public void run() 34 { 35 System.out.println(Thread.currentThread().getName() + " Start."); 36 Random r = new Random(100); 37 synchronized(list) 38 { 39 for (int i = 0; i < 5; i++) 40 { 41 list.add(new Integer(r.nextInt())); 42 } 43 System.out.println("The size of list is " + list.size()); 44 } 45 try 46 { 47 Thread.sleep(500); 48 } 49 catch(InterruptedException ex) 50 { 51 ex.printStackTrace(); 52 } 53 System.out.println(Thread.currentThread().getName() + " End."); 54 } 55 }; 56 57 thread1.start(); 58 thread2.start(); 59 thread1.join(); 60 thread2.join(); 61 }
我们应该把需要同步的内容集中在一起,尽量不包含其他不相关的、消耗大量资源的操作,示例中线程休眠的操作显然不应该包括在里面。
构造线程池
我们在Java回顾之网络通信中,已经构建了一个Socket连接池,这里我们在此基础上,构建一个线程池,完成基本的启动、休眠、唤醒、停止操作。
基本思路还是以数组的形式保持一系列线程,通过Socket通信,客户端向服务器端发送命令,当服务器端接收到命令后,根据收到的命令对线程数组中的线程进行操作。
Socket客户端的代码保持不变,依然采用构建Socket连接池时的代码,我们主要针对服务器端进行改造。
首先,我们需要定义一个线程对象,它用来执行我们的业务操作,这里简化起见,只让线程进行休眠。
1 enum ThreadStatus 2 { 3 Initial, 4 Running, 5 Sleeping, 6 Stopped 7 } 8 9 enum ThreadTask 10 { 11 Start, 12 Stop, 13 Sleep, 14 Wakeup 15 } 16 17 18 class MyThread extends Thread 19 { 20 public ThreadStatus status = ThreadStatus.Initial; 21 public ThreadTask task; 22 public void run() 23 { 24 status = ThreadStatus.Running; 25 while(true) 26 { 27 try { 28 Thread.sleep(3000); 29 if (status == ThreadStatus.Sleeping) 30 { 31 System.out.println(Thread.currentThread().getName() + " 进入休眠状态。"); 32 this.wait(); 33 } 34 } catch (InterruptedException e) { 35 System.out.println(Thread.currentThread().getName() + " 运行过程中出现错误。"); 36 status = ThreadStatus.Stopped; 37 } 38 } 39 } 40 }
然后,我们需要定义一个线程管理器,它用来对线程池中的线程进行管理,代码如下:
1 class MyThreadManager 2 { 3 public static void manageThread(MyThread[] threads, ThreadTask task) 4 { 5 for (int i = 0; i < threads.length; i++) 6 { 7 synchronized(threads[i]) 8 { 9 manageThread(threads[i], task); 10 } 11 } 12 System.out.println(getThreadStatus(threads)); 13 } 14 15 public static void manageThread(MyThread thread, ThreadTask task) 16 { 17 if (task == ThreadTask.Start) 18 { 19 if (thread.status == ThreadStatus.Running) 20 { 21 return; 22 } 23 if (thread.status == ThreadStatus.Stopped) 24 { 25 thread = new MyThread(); 26 } 27 thread.status = ThreadStatus.Running; 28 thread.start(); 29 30 } 31 else if (task == ThreadTask.Stop) 32 { 33 if (thread.status != ThreadStatus.Stopped) 34 { 35 thread.interrupt(); 36 thread.status = ThreadStatus.Stopped; 37 } 38 } 39 else if (task == ThreadTask.Sleep) 40 { 41 thread.status = ThreadStatus.Sleeping; 42 } 43 else if (task == ThreadTask.Wakeup) 44 { 45 thread.notify(); 46 thread.status = ThreadStatus.Running; 47 } 48 } 49 50 public static String getThreadStatus(MyThread[] threads) 51 { 52 StringBuffer sb = new StringBuffer(); 53 for (int i = 0; i < threads.length; i++) 54 { 55 sb.append(threads[i].getName() + "的状态:" + threads[i].status).append("\r\n"); 56 } 57 return sb.toString(); 58 } 59 }
最后,是我们的服务器端,它不断接受客户端的请求,每收到一个连接请求,服务器端会新开一个线程,来处理后续客户端发来的各种操作指令。
1 public class MyThreadPool { 2 3 public static void main(String[] args) throws IOException 4 { 5 MyThreadPool pool = new MyThreadPool(5); 6 } 7 8 private int threadCount; 9 private MyThread[] threads = null; 10 11 12 public MyThreadPool(int count) throws IOException 13 { 14 this.threadCount = count; 15 threads = new MyThread[count]; 16 for (int i = 0; i < threads.length; i++) 17 { 18 threads[i] = new MyThread(); 19 threads[i].start(); 20 } 21 Init(); 22 } 23 24 private void Init() throws IOException 25 { 26 ServerSocket serverSocket = new ServerSocket(5678); 27 while(true) 28 { 29 final Socket socket = serverSocket.accept(); 30 Thread thread = new Thread() 31 { 32 public void run() 33 { 34 try 35 { 36 System.out.println("检测到一个新的Socket连接。"); 37 BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream())); 38 PrintStream ps = new PrintStream(socket.getOutputStream()); 39 String line = null; 40 while((line = br.readLine()) != null) 41 { 42 System.out.println(line); 43 if (line.equals("Count")) 44 { 45 System.out.println("线程池中有5个线程"); 46 } 47 else if (line.equals("Status")) 48 { 49 String status = MyThreadManager.getThreadStatus(threads); 50 System.out.println(status); 51 } 52 else if (line.equals("StartAll")) 53 { 54 MyThreadManager.manageThread(threads, ThreadTask.Start); 55 } 56 else if (line.equals("StopAll")) 57 { 58 MyThreadManager.manageThread(threads, ThreadTask.Stop); 59 } 60 else if (line.equals("SleepAll")) 61 { 62 MyThreadManager.manageThread(threads, ThreadTask.Sleep); 63 } 64 else if (line.equals("WakeupAll")) 65 { 66 MyThreadManager.manageThread(threads, ThreadTask.Wakeup); 67 } 68 else if (line.equals("End")) 69 { 70 break; 71 } 72 else 73 { 74 System.out.println("Command:" + line); 75 } 76 ps.println("OK"); 77 ps.flush(); 78 } 79 } 80 catch(Exception ex) 81 { 82 ex.printStackTrace(); 83 } 84 } 85 }; 86 thread.start(); 87 } 88 } 89 }
探索JDK中的concurrent工具包
为了简化开发人员在进行多线程开发时的工作量,并减少程序中的bug,JDK提供了一套concurrent工具包,我们可以用它来方便的开发多线程程序。
线程池
我们在上面实现了一个非常“简陋”的线程池,concurrent工具包中也提供了线程池,而且使用非常方便。
concurrent工具包中的线程池分为3类:ScheduledThreadPool、FixedThreadPool和CachedThreadPool。
首先我们来定义一个Runnable的对象
1 class MyRunner implements Runnable 2 { 3 public void run() { 4 System.out.println(Thread.currentThread().getName() + "运行开始"); 5 for(int i = 0; i < 1; i++) 6 { 7 try 8 { 9 System.out.println(Thread.currentThread().getName() + "正在运行"); 10 Thread.sleep(200); 11 } 12 catch(Exception ex) 13 { 14 ex.printStackTrace(); 15 } 16 } 17 System.out.println(Thread.currentThread().getName() + "运行结束"); 18 } 19 }
可以看出,它的功能非常简单,只是输出了线程的执行过程。
ScheduledThreadPool
这和我们平时使用的ScheduledTask比较类似,或者说很像Timer,它可以使得一个线程在指定的一段时间内开始运行,并且在间隔另外一段时间后再次运行,直到线程池关闭。
示例代码如下:
1 private static void scheduledThreadPoolTest() 2 { 3 final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3); 4 5 MyRunner runner = new MyRunner(); 6 7 final ScheduledFuture<?> handler1 = scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS); 8 final ScheduledFuture<?> handler2 = scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS); 9 10 scheduler.schedule(new Runnable() 11 { 12 public void run() 13 { 14 handler1.cancel(true); 15 handler2.cancel(true); 16 scheduler.shutdown(); 17 } 18 }, 30, TimeUnit.SECONDS 19 ); 20 }
FixedThreadPool
这是一个指定容量的线程池,即我们可以指定在同一时间,线程池中最多有多个线程在运行,超出的线程,需要等线程池中有空闲线程时,才能有机会运行。
来看下面的代码:
1 private static void fixedThreadPoolTest() 2 { 3 ExecutorService exec = Executors.newFixedThreadPool(3); 4 for(int i = 0; i < 5; i++) 5 { 6 MyRunner runner = new MyRunner(); 7 exec.execute(runner); 8 } 9 exec.shutdown(); 10 }
注意它的输出结果:
pool-1-thread-1运行开始 pool-1-thread-1正在运行 pool-1-thread-2运行开始 pool-1-thread-2正在运行 pool-1-thread-3运行开始 pool-1-thread-3正在运行 pool-1-thread-1运行结束 pool-1-thread-1运行开始 pool-1-thread-1正在运行 pool-1-thread-2运行结束 pool-1-thread-2运行开始 pool-1-thread-2正在运行 pool-1-thread-3运行结束 pool-1-thread-1运行结束 pool-1-thread-2运行结束
可以看到从始至终,最多有3个线程在同时运行。
CachedThreadPool
这是另外一种线程池,它不需要指定容量,只要有需要,它就会创建新的线程。
它的使用方式和FixedThreadPool非常像,来看下面的代码:
1 private static void cachedThreadPoolTest() 2 { 3 ExecutorService exec = Executors.newCachedThreadPool(); 4 for(int i = 0; i < 5; i++) 5 { 6 MyRunner runner = new MyRunner(); 7 exec.execute(runner); 8 } 9 exec.shutdown(); 10 }
它的执行结果如下:
pool-1-thread-1运行开始 pool-1-thread-1正在运行 pool-1-thread-2运行开始 pool-1-thread-2正在运行 pool-1-thread-3运行开始 pool-1-thread-3正在运行 pool-1-thread-4运行开始 pool-1-thread-4正在运行 pool-1-thread-5运行开始 pool-1-thread-5正在运行 pool-1-thread-1运行结束 pool-1-thread-2运行结束 pool-1-thread-3运行结束 pool-1-thread-4运行结束 pool-1-thread-5运行结束
可以看到,它创建了5个线程。
处理线程返回值
在有些情况下,我们需要使用线程的返回值,在上述的所有代码中,线程这是执行了某些操作,没有任何返回值。
如何做到这一点呢?我们可以使用JDK中的Callable<T>和CompletionService<T>,前者返回单个线程的结果,后者返回一组线程的结果。
返回单个线程的结果
还是直接看代码吧:
1 private static void callableTest() throws InterruptedException, ExecutionException 2 { 3 ExecutorService exec = Executors.newFixedThreadPool(1); 4 Callable<String> call = new Callable<String>() 5 { 6 public String call() 7 { 8 return "Hello World."; 9 } 10 }; 11 Future<String> result = exec.submit(call); 12 System.out.println("线程的返回值是" + result.get()); 13 exec.shutdown(); 14 }
执行结果如下:
线程的返回值是Hello World.
返回线程池中每个线程的结果
这里需要使用CompletionService<T>,代码如下:
1 private static void completionServiceTest() throws InterruptedException, ExecutionException 2 { 3 ExecutorService exec = Executors.newFixedThreadPool(10); 4 CompletionService<String> service = new ExecutorCompletionService<String>(exec); 5 for (int i = 0; i < 10; i++) 6 { 7 Callable<String> call = new Callable<String>() 8 { 9 public String call() throws InterruptedException 10 { 11 return Thread.currentThread().getName(); 12 } 13 }; 14 service.submit(call); 15 } 16 17 Thread.sleep(1000); 18 for(int i = 0; i < 10; i++) 19 { 20 Future<String> result = service.take(); 21 System.out.println("线程的返回值是" + result.get()); 22 } 23 exec.shutdown(); 24 }
执行结果如下:
线程的返回值是pool-2-thread-1 线程的返回值是pool-2-thread-2 线程的返回值是pool-2-thread-3 线程的返回值是pool-2-thread-5 线程的返回值是pool-2-thread-4 线程的返回值是pool-2-thread-6 线程的返回值是pool-2-thread-8 线程的返回值是pool-2-thread-7 线程的返回值是pool-2-thread-9 线程的返回值是pool-2-thread-10
实现生产者-消费者模型
对于生产者-消费者模型来说,我们应该都不会陌生,通常我们都会使用某种数据结构来实现它。在concurrent工具包中,我们可以使用BlockingQueue来实现生产者-消费者模型,如下:
1 public class BlockingQueueSample { 2 3 public static void main(String[] args) 4 { 5 blockingQueueTest(); 6 } 7 8 private static void blockingQueueTest() 9 { 10 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(); 11 final int maxSleepTimeForSetter = 10; 12 final int maxSleepTimerForGetter = 10; 13 14 Runnable setter = new Runnable() 15 { 16 public void run() 17 { 18 Random r = new Random(); 19 while(true) 20 { 21 int value = r.nextInt(100); 22 try 23 { 24 queue.put(new Integer(value)); 25 System.out.println(Thread.currentThread().getName() + "---向队列中插入值" + value); 26 Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000); 27 } 28 catch(Exception ex) 29 { 30 ex.printStackTrace(); 31 } 32 } 33 } 34 }; 35 36 Runnable getter = new Runnable() 37 { 38 public void run() 39 { 40 Random r = new Random(); 41 while(true) 42 { 43 try 44 { 45 if (queue.size() == 0) 46 { 47 System.out.println(Thread.currentThread().getName() + "---队列为空"); 48 } 49 else 50 { 51 int value = queue.take().intValue(); 52 System.out.println(Thread.currentThread().getName() + "---从队列中获取值" + value); 53 } 54 Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000); 55 } 56 catch(Exception ex) 57 { 58 ex.printStackTrace(); 59 } 60 } 61 } 62 }; 63 64 ExecutorService exec = Executors.newFixedThreadPool(2); 65 exec.execute(setter); 66 exec.execute(getter); 67 } 68 }
我们定义了两个线程,一个线程向Queue中添加数据,一个线程从Queue中取数据。我们可以通过控制maxSleepTimeForSetter和maxSleepTimerForGetter的值,来使得程序得出不同的结果。
可能的执行结果如下:
pool-1-thread-1---向队列中插入值88 pool-1-thread-2---从队列中获取值88 pool-1-thread-1---向队列中插入值75 pool-1-thread-2---从队列中获取值75 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-1---向队列中插入值50 pool-1-thread-2---从队列中获取值50 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-2---队列为空 pool-1-thread-1---向队列中插入值51 pool-1-thread-1---向队列中插入值92 pool-1-thread-2---从队列中获取值51 pool-1-thread-2---从队列中获取值92
因为Queue中的值和Thread的休眠时间都是随机的,所以执行结果也不是固定的。
使用信号量来控制线程
JDK提供了Semaphore来实现“信号量”的功能,它提供了两个方法分别用于获取和释放信号量:acquire和release,示例代码如下:
1 private static void semaphoreTest() 2 { 3 ExecutorService exec = Executors.newFixedThreadPool(10); 4 final Semaphore semp = new Semaphore(2); 5 6 for (int i = 0; i < 10; i++) 7 { 8 Runnable runner = new Runnable() 9 { 10 public void run() 11 { 12 try 13 { 14 semp.acquire(); 15 System.out.println(new Date() + " " + Thread.currentThread().getName() + "正在执行。"); 16 Thread.sleep(5000); 17 semp.release(); 18 } 19 catch(Exception ex) 20 { 21 ex.printStackTrace(); 22 } 23 } 24 }; 25 exec.execute(runner); 26 } 27 28 exec.shutdown(); 29 }
执行结果如下:
Tue May 07 11:22:11 CST 2013 pool-1-thread-1正在执行。 Tue May 07 11:22:11 CST 2013 pool-1-thread-2正在执行。 Tue May 07 11:22:17 CST 2013 pool-1-thread-3正在执行。 Tue May 07 11:22:17 CST 2013 pool-1-thread-4正在执行。 Tue May 07 11:22:22 CST 2013 pool-1-thread-5正在执行。 Tue May 07 11:22:22 CST 2013 pool-1-thread-6正在执行。 Tue May 07 11:22:27 CST 2013 pool-1-thread-7正在执行。 Tue May 07 11:22:27 CST 2013 pool-1-thread-8正在执行。 Tue May 07 11:22:32 CST 2013 pool-1-thread-10正在执行。 Tue May 07 11:22:32 CST 2013 pool-1-thread-9正在执行。
可以看出,尽管线程池中创建了10个线程,但是同时运行的,只有2个线程。
控制线程池中所有线程的执行步骤
在前面,我们已经提到,可以用synchronized关键字来控制单个线程中的执行步骤,那么如果我们想要对线程池中的所有线程的执行步骤进行控制的话,应该如何实现呢?
我们有两种方式,一种是使用CyclicBarrier,一种是使用CountDownLatch。
CyclicBarrier使用了类似于Object.wait的机制,它的构造函数中需要接收一个整型数字,用来说明它需要控制的线程数目,当在线程的run方法中调用它的await方法时,它会保证所有的线程都执行到这一步,才会继续执行后面的步骤。
示例代码如下:
1 class MyRunner2 implements Runnable 2 { 3 private CyclicBarrier barrier = null; 4 public MyRunner2(CyclicBarrier barrier) 5 { 6 this.barrier = barrier; 7 } 8 9 public void run() { 10 Random r = new Random(); 11 try 12 { 13 for (int i = 0; i < 3; i++) 14 { 15 Thread.sleep(r.nextInt(10) * 1000); 16 System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--第" + (i + 1) + "次等待。"); 17 barrier.await(); 18 } 19 } 20 catch(Exception ex) 21 { 22 ex.printStackTrace(); 23 } 24 } 25 26 } 27 28 private static void cyclicBarrierTest() 29 { 30 CyclicBarrier barrier = new CyclicBarrier(3); 31 32 ExecutorService exec = Executors.newFixedThreadPool(3); 33 for (int i = 0; i < 3; i++) 34 { 35 exec.execute(new MyRunner2(barrier)); 36 } 37 exec.shutdown(); 38 }
执行结果如下:
Tue May 07 11:31:20 CST 2013--pool-1-thread-2--第1次等待。 Tue May 07 11:31:21 CST 2013--pool-1-thread-3--第1次等待。 Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第1次等待。 Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第2次等待。 Tue May 07 11:31:26 CST 2013--pool-1-thread-3--第2次等待。 Tue May 07 11:31:30 CST 2013--pool-1-thread-2--第2次等待。 Tue May 07 11:31:32 CST 2013--pool-1-thread-1--第3次等待。 Tue May 07 11:31:33 CST 2013--pool-1-thread-3--第3次等待。 Tue May 07 11:31:33 CST 2013--pool-1-thread-2--第3次等待。
可以看出,thread-2到第1次等待点时,一直等到thread-1到达后才继续执行。
CountDownLatch则是采取类似”倒计时计数器”的机制来控制线程池中的线程,它有CountDown和Await两个方法。示例代码如下:
1 private static void countdownLatchTest() throws InterruptedException 2 { 3 final CountDownLatch begin = new CountDownLatch(1); 4 final CountDownLatch end = new CountDownLatch(5); 5 ExecutorService exec = Executors.newFixedThreadPool(5); 6 for (int i = 0; i < 5; i++) 7 { 8 Runnable runner = new Runnable() 9 { 10 public void run() 11 { 12 Random r = new Random(); 13 try 14 { 15 begin.await(); 16 System.out.println(Thread.currentThread().getName() + "运行开始"); 17 Thread.sleep(r.nextInt(10)*1000); 18 System.out.println(Thread.currentThread().getName() + "运行结束"); 19 } 20 catch(Exception ex) 21 { 22 ex.printStackTrace(); 23 } 24 finally 25 { 26 end.countDown(); 27 } 28 } 29 }; 30 exec.execute(runner); 31 } 32 begin.countDown(); 33 end.await(); 34 System.out.println(Thread.currentThread().getName() + "运行结束"); 35 exec.shutdown(); 36 }
执行结果如下:
pool-1-thread-1运行开始 pool-1-thread-5运行开始 pool-1-thread-2运行开始 pool-1-thread-3运行开始 pool-1-thread-4运行开始 pool-1-thread-2运行结束 pool-1-thread-1运行结束 pool-1-thread-3运行结束 pool-1-thread-5运行结束 pool-1-thread-4运行结束 main运行结束