当前位置:
首页 > Python基础教程 >
-
PythonI/O进阶学习笔记_11.python的多进程(2)
=="__main__":
progress = multiprocessing.Process(target=get_html,args=(3,))
print(progress.pid)
progress.start()
print(progress.pid)
progress.join()
#output
None
12864
sub_progress sccess
3.继承Progress类(与之前的Thread类一样)
import multiprocessing #多进程编程 import time class progress_get_html(multiprocessing.Process): def __init__(self,n): self.n=n super().__init__() def run(self): time.sleep(self.n) print("sub progress success") class MyProgress(multiprocessing.Process): def __init__(self,n): self.n=n super().__init__() def run(self): pro=progress_get_html(self.n) pro.start() print("progress end") if __name__=="__main__": progress = MyProgress(3) print(progress.pid) progress.start() print(progress.pid) progress.join() #output: None 8744 progress end sub progress success
4.使用进程池
指明进程数,不指明的话,可以直接默认为cpu数(cpu_count() or 1)。
from concurrent.futures import ProcessPoolExecutor from multiprocessing import pool import multiprocessing #多进程编程 import time def get_html(n): time.sleep(n) print("sub_progress sccess") return n if __name__=="__main__": pool=multiprocessing.Pool(multiprocessing.cpu_count()) result=pool.apply_async(get_html,args=(3,)) print(result.get()) #pool在调用join之前 需要调用close 来让它不再接收任务。否则会报错 pool.close() pool.join() print(result.get()) #output sub_progress sccess 3 3
其他方法:
- imap:按照参数输入顺序
if __name__=="__main__": pool=multiprocessing.Pool(multiprocessing.cpu_count()) for result in pool.imap(get_html,[1,5,3]): print("sleep {} successed ".format(result)) #output: sub_progress sccess sleep 1 successed sub_progress sccess sub_progress sccess sleep 5 successed sleep 3 successed
if __name__=="__main__": pool=multiprocessing.Pool(multiprocessing.cpu_count()) #for result in pool.imap(get_html,[1,5,3]): # print("sleep {} successed ".format(result)) for result in pool.imap_unordered(get_html,[1,5,3]): print("sleep {} successed ".format(result)) #output: sub_progress sccess sleep 1 successed sub_progress sccess sleep 3 successed sub_progress sccess sleep 5 successed
三. 进程间通信
与线程间不同的是,线程间同步的类和锁是不可用的。
1.Queue(注意是multiprocessing而不是thread的)
from multiprocessing import Process,Queue import time def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data=queue.get() print(data) if __name__== "__main__": queue=Queue(10) my_producer = Process(target=producer,args=(queue,)) my_consumer = Process(target=consumer,args=(queue,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() #outpu: a
注意:multprocess中的Queue是不能用于pool进程池的
2.Manager(与进程池共用)
Manager中有个Queue,如果像实现pool中的进程间通信,需要使用Manager中的Queue。
from multiprocessing import Process,pool,Manager,Pool import time def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data=queue.get() print(data) if __name__== "__main__": queue=Manager().Queue() pool=Pool(3) pool.apply_async(producer,args=(queue,)) pool.apply_async(consumer,args=(queue,)) pool.close() pool.join() #output: a
3.管道pipe
pipe只能适用于两个指定的进程。
pipe的性能高于queue的,queue加了很多的锁操作。
from multiprocessing import Process,pool,Manager,Pool,Pipe import time def producer(pipe): pipe.send("hello") def consumer(pipe): print(pipe.recv()) if __name__== "__main__": recv_pipe,send_pipe=Pipe() my_producer=Process(target=producer,args=(send_pipe,)) my_consumer=Process(target=consumer,args=(recv_pipe,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() #output: hello
4.进程间共享内存操作 Mnager的dict、list、value等。
from multiprocessing import Process,pool,Manager,Pool,Pipe import time def add_data(p_dict,key,value): p_dict[key]=value if __name__ == "__main__": progress_dict= Manager().dict() first_progress= Process(target=add_data,args=(progress_dict,"name","tangrong")) second_progress = Process(target=add_data,args=(progress_dict,"age","18")) first_progress.start() second_progress.start() first_progress.join() second_progress.join() print(progress_dict) #output: {'name': 'tangrong', 'age': '18'}
在使用的时候,可以用Manager中的数据结构,但是注意数据同步(LOCK,RLOCK等)