VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > Python基础教程 >
  • Python3标准库:threading进程中管理并发操作(6)

'(%(threadName)-10s) %(message)s',
  • )
  •  
  • lock = threading.Lock()
  • w = threading.Thread(target=worker_with, args=(lock,))
  • nw = threading.Thread(target=worker_no_with, args=(lock,))
  •  
  • w.start()
  • nw.start()
  • 函数worker_with()和worker_no_with()用等价的方式管理锁。

    1.9 同步线程

    除了使用Event,还可以通过使用一个Condition对象来同步线程。由于Condition使用了一个Lock,所以它可以绑定到一个共享资源,允许多个线程等待资源更新。在下一个例子中,consumer()线程要等待设置了Condition才能继续。producer()线程负责设置条件,以及通知其他线程继续。

    
    	
    1. import logging
    2. import threading
    3. import time
    4.  
    5. def consumer(cond):
    6. """wait for the condition and use the resource"""
    7. logging.debug('Starting consumer thread')
    8. with cond:
    9. cond.wait()
    10. logging.debug('Resource is available to consumer')
    11.  
    12. def producer(cond):
    13. """set up the resource to be used by the consumer"""
    14. logging.debug('Starting producer thread')
    15. with cond:
    16. logging.debug('Making resource available')
    17. cond.notifyAll()
    18.  
    19. logging.basicConfig(
    20. level=logging.DEBUG,
    21. format='%(asctime)s (%(threadName)-2s) %(message)s',
    22. )
    23.  
    24. condition = threading.Condition()
    25. c1 = threading.Thread(name='c1', target=consumer,
    26. args=(condition,))
    27. c2 = threading.Thread(name='c2', target=consumer,
    28. args=(condition,))
    29. p = threading.Thread(name='p', target=producer,
    30. args=(condition,))
    31.  
    32. c1.start()
    33. time.sleep(0.2)
    34. c2.start()
    35. time.sleep(0.2)
    36. p.start()

    这些线程使用with来获得与Condition关联的锁。也可以显式地使用acquire()和release()方法。

    屏障(barrier)是另一种线程同步机制。Barrier会建立一个控制点,所有参与线程会在这里阻塞,直到所有这些参与“方”都到达这一点。采用这种方法,线程可以单独启动然后暂停,直到所有线程都准备好才可以继续。

    
    	
    1. import threading
    2. import time
    3.  
    4. def worker(barrier):
    5. print(threading.current_thread().name,
    6. 'waiting for barrier with {} others'.format(
    7. barrier.n_waiting))
    8. worker_id = barrier.wait()
    9. print(threading.current_thread().name, 'after barrier',
    10. worker_id)
    11.  
    12. NUM_THREADS = 3
    13.  
    14. barrier = threading.Barrier(NUM_THREADS)
    15.  
    16. threads = [
    17. threading.Thread(
    18. name='worker-%s' % i,
    19. target=worker,
    20. args=(barrier,),
    21. )
    22. for i in range(NUM_THREADS)
    23. ]
    24.  
    25. for t in threads:
    26. print(t.name, 'starting')
    27. t.start()
    28. time.sleep(0.1)
    29.  
    30. for t in threads:
    31. t.join()

    在这个例子中,Barrier被配置为会阻塞线程,直到3个线程都在等待。满足这个条件时,所有线程被同时释放从而越过这个控制点。wait()的返回值指示了释放的参与线程数,可以用来限制一些线程做清理资源等动作。

    Barrier的abort()方法会使所有等待线程接收一个BrokenBarrierError。如果线程在wait()上被阻塞而停止处理,这就允许线程完成清理工作。

    
    	
    1. import threading
    2. import time
    3.  
    4. def worker(barrier):
    5. print(threading.current_thread().name,
    6. 'waiting for barrier with {} others'.format(
    7. barrier.n_waiting))
    8. try:
    9. worker_id = barrier.wait()
    10. except threading.BrokenBarrierError:
    11. print(threading.current_thread().name, 'aborting')
    
    相关教程