VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > Python基础教程 >
  • PythonI/O进阶学习笔记_10.python的多线程(3)

 
c. self.cond.notify()
 
4.信号量 semaphore
是可以用来控制线程执行数量的锁。
 
4.1 semaphore的使用
需求:现在有个文件,对文件可以进行读和写,但是写是互斥的,读是共享的。并且对读的共享数也是有控制的。
例:爬虫。控制爬虫的并发数。
复制代码
import threading
import time
 
class HtmlSpider(threading.Thread):
    def __init__(self,url,sem):
        super().__init__()
        self.url=url
        self.sem=sem
 
    def run(self):
        time.sleep(2)
        print("got html text success")
        self.sem.release()
 
class UrlProducer(threading.Thread):
    def __init__(self,sem):
        super().__init__()
        self.sem=sem
    def run(self):
        for i in range(20):
            self.sem.acquire()
            html_test=HtmlSpider("www.baidu.com/{}".format(i),self.sem)
            html_test.start()
 
if __name__=="__main__":
    sem=threading.Semaphore(3)  #设置控制的数量为3
 
    urlproducer=UrlProducer(sem)
    urlproducer.start()
复制代码

ps:

  • 每acquire一次,数量就会被减少一,release的时候数量会自动回来。
  • 需要注意sem释放的地方,应该是在HtmlSpider运行完之后进行释放。
 
4.2 semaphore源码 
实际上semaphore就是对condition的简单应用。
 
a.sem=threading.Semaphore(3)
实际上就是在初始化的时候,调用了Condition。
 
b.self.sem.acquire()
我们简单看这个逻辑就是,如果设置的数用完了,就让condition进入wait状态,否则就把数量减一。
 
c.self.sem.release()
release 也是很简单的数量加一和condition的notify。
 
 
5.除了上述的对Condition的应用,queue模块中的Queue也对Condition做了更为复杂的应用。特别是queue中的put。
复制代码
class Queue:
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)
        。。。
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
 
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0
 
    def put(self, item, block=True, timeout=None):
       
        with self.not_full:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = time() + timeout
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()
    。。。。。。
复制代码

 

四. 线程池
在前面进行线程间通信的时候,想要多个线程进行并发的时候,需要我们自己去维护锁。
但是我们现在希望有工具来帮我们对想要线程并发数进行管理。于是有了线程池。
那么为什么明明有了信号量 semaphore 还需要线程池呢?
因为线程池不只是控制了线程数量而已。
 
比如说,现在有需求,在主进程中,我们需要得到某个线程的状态。
并且线程的状态不管是退出还是什么,主进程能立刻知道。
futures让多线程和多进程的接口一致。
 
1.使用线程池
concurrent.futures中有两个类ThreadPoolExecutor和 ProcessPoolExecutor 分别用于线程池和进程池的创建,基类是futures的Executor类。
使用ThreadPoolExecutor只需要将要执行的函数和要并发的线程数交给它就可以了。
使用线程池来执行线程任务的步骤如下:
a.调用 ThreadPoolExecutor 类的构造器创建一个线程池。
b.定义一个普通函数作为线程任务。
c.调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。submit返回的是Future类(重要)
d. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
复制代码
#例:将之前爬虫模拟的脚本改为线程池用。

from concurrent.futures import  ThreadPoolExecutor
import time

def get_html(times):
    time.sleep(times)
    print("get html page {} successed!".format(times))
    return times

excutor=ThreadPoolExecutor(max_workers=2)
#submit 提交到线程池
#submit的返回很重要,返回的对象Future类可以判断这个函数的执行状态等
#submit 是非阻塞的
task1=excutor.submit(get_html,(3))
task2=excutor.submit(get_html,(2))

print(task1.done())  
print(task2.done())

#result 是阻塞的,接受函数的返回
print(task1.result())
print(task2.result())

#output:
False
False
get html page 2 successed!
get html page 
      



  

相关教程
关于我们--广告服务--免责声明--本站帮助-友情链接--版权声明--联系我们       黑ICP备07002182号