4.信号量 semaphore
是可以用来控制线程执行数量的锁。
4.1 semaphore的使用
需求:现在有个文件,对文件可以进行读和写,但是写是互斥的,读是共享的。并且对读的共享数也是有控制的。
例:爬虫。控制爬虫的并发数。
import threading import time class HtmlSpider(threading.Thread): def __init__(self,url,sem): super().__init__() self.url=url self.sem=sem def run(self): time.sleep(2) print("got html text success") self.sem.release() class UrlProducer(threading.Thread): def __init__(self,sem): super().__init__() self.sem=sem def run(self): for i in range(20): self.sem.acquire() html_test=HtmlSpider("www.baidu.com/{}".format(i),self.sem) html_test.start() if __name__=="__main__": sem=threading.Semaphore(3) #设置控制的数量为3 urlproducer=UrlProducer(sem) urlproducer.start()
ps:
-
每acquire一次,数量就会被减少一,release的时候数量会自动回来。
-
需要注意sem释放的地方,应该是在HtmlSpider运行完之后进行释放。
4.2 semaphore源码
实际上semaphore就是对condition的简单应用。
a.sem=threading.Semaphore(3)
实际上就是在初始化的时候,调用了Condition。
b.self.sem.acquire()
我们简单看这个逻辑就是,如果设置的数用完了,就让condition进入wait状态,否则就把数量减一。
c.self.sem.release()
release 也是很简单的数量加一和condition的notify。
5.除了上述的对Condition的应用,queue模块中的Queue也对Condition做了更为复杂的应用。特别是queue中的put。
class Queue: def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) 。。。 self.mutex = threading.Lock() self.not_empty = threading.Condition(self.mutex) self.not_full = threading.Condition(self.mutex) self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 def put(self, item, block=True, timeout=None): with self.not_full: if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise Full elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while self._qsize() >= self.maxsize: remaining = endtime - time() if remaining <= 0.0: raise Full self.not_full.wait(remaining) self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() 。。。。。。