首页 > Python基础教程 >
-
python基础教程之进程与线程
1 概念梳理:
1.1 进程
一个程序的执行实例就是一个进程。每一个进程提供执行程序所需的所有资源。(进程本质上是资源的集合。)一个进程有一个虚拟的地址空间、可执行的代码、操作系统
的接口、安全的上下文(记录启动该进程的用户和权限等等)、唯一的进程ID,环境变量,优先级类,最小和最大的工作空间(内存空间),还要有至少一个线程。
每一个进程启动时都会最先产生一个线程,即主线程。然后主线程会再创建其他的子线程。
与进程相关的资源包括:
- 内存页(同一个进程中的所有线程共享同一个内存空间)
- 文件描述符(eg. open sockets)
- 安全凭证(eg. 启动该进程的用户ID)
1.2 线程
1.1.1 什么是线程
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个
线程,每条线程并行执行不同的任务。一个
线程是一个execution context(执行上下文),即一个cpu执行是所需的一串指令。
1.1.2 线程的工作方式
假设你正在读一本书,没有读完,你想休息一下,但是你想在回来时恢复到当时读的具体进度。有一个方法就是记下页数、行数与字数这三个数值,这些数值就是execution context。如果你的室友在你休息的时候,使用相同的方法读这本书。你和她只需要这三个数字记下来就可以在交替的时间共同阅读这本书了。
线程的工作方式与此类似。CPU会给你一个在同一时间能够做多个运算的幻觉,实际上它在每个运算上只花了极少的时间,本质上CPU同一时刻只干了一件事。它能这样做
就是因为它有每个运算的execution context。就像你能够和你朋友共享同一本书一样,多任务也能共享同一块CPU。
1.3 进程和线程区别
1.同一个进程中的线程共享同一内存空间,但是进程之间是独立的。
2.同一个进程中的所有线程的数据是共享的(线程通讯),进程之间的数据是独立的。
3.对主线程的修改可能会影响其他线程的行为,但是父进程的修改(除了删除以外)不会影响其他子进程。
4.线程是一个上下文的执行指令,而进程则是与运算相关的一簇资源。
5.同一个进程的线程之间可以直接通信,但是进程之间的交流需要借助中间代理来实现。
6.创建新的线程很容易,但是创建新的进程需要对父进程做一次复制。
7.一个线程可以操作同一进程的其他线程,但是进程只能操作其子进程。
8.线程启动速度快,进程启动速度慢(但是两者运行速度没有可比性)。
1.4 总结
对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,
打开一个Word就启动了一个Word进程。
有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,
我们把进程内的这些“子任务”称为线程(Thread)。
2 多线程:
2.1 线程常用方法
1. start() 线程准备就绪,等待CPU调度
2. setName() 为线程设置名称
3. getName() 获取线程名称
4. setDaemon(True) 设置为守护线程
5. join() 逐个执行每个线程,执行完毕后继续往下执行
6. run() 线程被CPU调度后自动执行线程对象的run方法,如果想自定义线程类,直接重写run方法就行了。
2.1.1 Thread类
1. 普通创建方式
1 1 # 新线程执行的代码 2 2 def loop(): 3 3 print('thread %s is running...' % threading.current_thread().name) 4 4 n = 0 5 5 while n < 5: 6 6 n += 1 7 7 print('thread %s >>> %s' % (threading.current_thread().name, n)) 8 8 time.sleep(1) 9 9 print('thread %s ended.' % threading.current_thread().name) 10 10 11 11 print('thread %s is running...' % threading.current_thread().name) 12 12 t = threading.Thread(target = loop, name = 'LoopThread') 13 13 t.start() 14 14 t.join() 15 15 print('thread %s ended.' % threading.current_thread().name) 16 16 17 17 """ 18 任何进程默认就会启动一个线程,我们把该线程称为主线程(MainThread)主线程又可以启动新线程, 19 Python中的Threading模块有一个current_thread()函数,它永远返回当前线程的实例子线程的名字在创建时指定, 20 我们用LoopThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名 21 为Thread-1,Thread-2 22 """
1 thread MainThread is running... 2 thread LoopThread is running... 3 thread LoopThread >>> 1 4 thread LoopThread >>> 2 5 thread LoopThread >>> 3 6 thread LoopThread >>> 4 7 thread LoopThread >>> 5 8 thread LoopThread ended. 9 thread MainThread ended.
1 import threading 2 import time 3 4 def run(n): 5 print('task', n) 6 time.sleep(1) 7 print('2s') 8 time.sleep(1) 9 print('1s') 10 time.sleep(1) 11 print('0s') 12 time.sleep(1) 13 14 # 创建t1,t2两个线程 15 t1 = threading.Thread(target = run, args = ('t1',)) 16 t2 = threading.Thread(target = run, args = ('t2',)) 17 t1.start() 18 t2.start()
2. 继承threading.Thread来自定义线程类
其本质是重构Thread类中的run方法。
1 import threading 2 import time 3 4 class MyThread(threading.Thread): 5 def __init__(self, n): 6 super(MyThread, self).__init__() # 重构run函数必须要写 7 self.n = n 8 def run(self): 9 print('task', self.n) 10 time.sleep(1) 11 print('2s') 12 time.sleep(1) 13 print('1s') 14 time.sleep(1) 15 print('0s') 16 time.sleep(1) 17 18 if __name__ == '__main__': 19 t1 = MyThread('t1') 20 t2 = MyThread('t2') 21 22 t1.start() 23 t2.start()
2.1.2 计算子线程执行时间
1 import time, threading 2 3 def run(n): 4 print('task', n, threading.current_thread()) # 输出当前线程 5 time.sleep(1) 6 print('3s') 7 time.sleep(1) 8 print('2s') 9 time.sleep(1) 10 print('1s') 11 12 start_time = time.time() 13 14 t_obj = [] # 定义线程用于存放子线程实例 15 16 # 由主线程生成三个子线程,并将子线程添加到t_obj中 17 for i in range(3): 18 t = threading.Thread(target = run, args = ('t-%s' % i,)) 19 t.start() 20 t_obj.append(t) 21 22 # join() 等此线程执行完之后,再执行其他线程或主线程 23 for tmp in t_obj: 24 t.join() # 为每个子线程添加join之后,主线程就会等待这些子线程执行完之后再执行。 25 26 print('cost:', time.time() - start_time) # 主线程 27 28 print(threading.current_thread()) # 输出当前线程
结果:要思考清楚是,线程在执行时相当于是异步并行执行,所以说主线程与其他子线程的执行时间应该是一样的。
根据结果,最终输出的主线程的执行时间约为3.04秒,那么所有线程的执行时间也就约为3.04秒,这与程序中一共挂起3秒钟是一致的!
1 task t-0 <Thread(Thread-11, started 10848)> 2 task t-1 <Thread(Thread-12, started 14636)> 3 task t-2 <Thread(Thread-13, started 4076)> 4 5 cost: 3.0460238456726074 6 <_MainThread(MainThread, started 16912)>
2.1.3 统计当前活跃的线程数
使用Threading模块中active_count()方法,返回当前活跃的线程数
1 import threading, time 2 3 def run(n): 4 print('task', n) 5 6 for i in range(3): 7 t = threading.Thread(target = run, args = ('t-%s' % i,)) 8 t.start() 9 print(threading.active_count()) # 输出当前活跃的线程数
1 task t-0 2 task t-1 3 task t-2 4 4
2.1.4 守护线程
使用setDaemon(True)把所有的子线程都变成了主线程的守护线程,因此当主线程结束后,子线程也会随之结束。所以当主线程结束后,整个程序
就退出了。
1 import threading 2 import time 3 4 def run(n): 5 print("task", n) 6 time.sleep(1) #此时子线程停1s 7 print('3') 8 time.sleep(1) 9 print('2') 10 time.sleep(1) 11 print('1') 12 13 for i in range(3): 14 t = threading.Thread(target=run, args=("t-%s" % i,)) 15 t.setDaemon(True) #把子线程设置为守护线程,必须在start()之前设置 16 t.start() 17 18 time.sleep(0.5) #主线程停0.5秒 19 print(threading.active_count()) #输出活跃的线程数 20 """ 21 task t-0 22 task t-1 23 task t-2 24 4 25 26 Process finished with exit code 0 27 """
2.1.5 GIL
在非python环境中,单核情况下,同时只能有一个任务执行。多核时可以支持多个线程同时执行。但是在python
中,无论有多少核,同时只能执行一个线程。究其原因,这就是由于GIL的存在导致的。
GIL的全称是Global Interpreter Lock(全局解释器锁),来源是python设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到GIL,
我们可以把GIL看作是“通行证”,并且在一个python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。GIL只在cpython中才有,
因为cpython调用的是c语言的原生线程,所以他不能直接操作cpu,只能利用GIL保证同一时间只能有一个线程拿到数据。而在pypy和jpython中是没有GIL的。
Python多线程的工作过程:
python在使用多线程的时候,调用的是C语言的原生线程。
1. 拿到公共数据
2. 申请GIL
3. python解释器调用os原生线程
4. os操作CPU执行计算
5. 当该线程执行时间到后,无论运算是否已经执行完,GIL都被要求释放
6. 进而由其他进程重复上面的过程
7. 等其他进程执行完后,又会切换到之前的线程(从他的记录的上下文继续执行)
整个过程是每个线程执行自己的运算,当执行时间到就进行切换(context switch)。
2.1.5 线程锁、互斥锁
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁,即同一时刻允许一个线程执行操作。线程锁于
锁定资源,你可以定义多个锁, 当你需要独占某一资源时,任何一个锁都可以锁这个资源,就好比你用不同的锁都可以把相同的一个门锁住是一个道理。
由于线程之间是进行随机调度,如果有多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期,我们也称此为“线程不安全”。
为了防止上面的情况发生,引入互斥锁,也就是说,一旦某个线程将某一资源锁定,那么其他线程必须等待该锁释放之后才能操作该资源。
也就是说,互斥锁只允许一个线程操作数据。
如果使用互斥锁,那么只能允许一个线程操作数据,这个效率是很低的,为了解决这个问题,我们引入信号量,也就是BoundedSemaphore类
来同时允许一定数量的线程更改数据。以下为BoundedSemaphore类代码。
1 import threading 2 import time 3 4 5 def run(n): 6 semaphore.acquire() #加锁 7 time.sleep(1) 8 print("run the thread:%s\n" % n) 9 semaphore.release() #释放 10 11 12 num = 0 13 semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行 14 15 for i in range(22): 16 t = threading.Thread(target=run, args=("t-%s" % i,)) 17 t.start() 18 19 while threading.active_count() != 1: 20 pass # print threading.active_count() 21 else: 22 print('-----all threads done-----')
3 多进程:
在linux中,每个进程都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据,但是进程之间的数据本身是不能共享的。
在如下代码中,创建了两个进程,分别是pw和pr。
1 from multiprocessing import Process, Queue 2 import os, time, random 3 4 # 写数据进行执行的代码 5 def write(q): 6 print('Process to write: %s' % os.getpid()) 7 for value in ['A', 'B', 'C']: 8 print('Put %s to queue...' % value) 9 q.put(value) 10 time.sleep(random.random()) 11 12 # 读进程执行的代码 13 def read(q): 14 print('Process to read: %s' % os.getpid()) 15 while True: 16 value = q.get(True) 17 print('Get %s from queue.' % value) 18 19 if __name__ == '__main__': 20 # 父进程创建Queue,并传给各个子进程 21 q = Queue() 22 23 pw = Process(target = write(q)) 24 pr = Process(target = read(q))
也可以通过Pool类来创建多个进程:
进程池中常用的方法:
apply() :同步执行(串行)
apply_async() 异步执行(并行)
teminate() 立刻关闭进程池
join() 主进程等待所有子进程执行完毕,必须在close或者terminate()之后
close() 等待所有进程结束后,才关闭进程池。
1 from multiprocessing import Pool 2 import os, time, random 3 4 def long_time_task(name): 5 print('Run task %s (%s)...' % (name, os.getpid())) 6 start = time.time() 7 time.sleep(random.random() * 3) 8 end = time.time() 9 print('Task %s runs %0.2f seconds.' % (name, (end - start))) 10 11 if __name__ == '__main__': 12 print('Parent process %s.' % os.getpid()) 13 14 # 通过用Pool类,相当于是创建了 5 个子进程的一个实例p,通过使用for循环来让这 5 个子进程 15 # 用的是apply_async()方法,所以,一个进程只管去执行,不管其他进程如何(异步执行) 16 # 作为对比apply() 方法 是同步执行 17 p = Pool(4) 18 for i in range(5): 19 p.apply(func = long_time_task, args = (i,)) 20 print('Waiting for all subprocesses done...') 21 p.close() 22 p.join() 23 print('All subprocesses done.')
1 # 使用apply_async()方法,异步执行结果 2 3 Parent process 13944. 4 Waiting for all subprocesses done... 5 Run task 0 (3464)... 6 Run task 1 (12836)... 7 Run task 2 (10256)... 8 Run task 3 (2448)... 9 Task 1 runs 1.63 seconds. 10 Run task 4 (12836)... 11 Task 2 runs 1.69 seconds. 12 Task 0 runs 1.88 seconds. 13 Task 3 runs 2.96 seconds. 14 Task 4 runs 1.50 seconds. 15 All subprocesses done.
1 # 使用apply()方法,同步执行结果 2 3 Parent process 4076. 4 Run task 0 (6152)... 5 Task 0 runs 2.36 seconds. 6 Run task 1 (15300)... 7 Task 1 runs 0.68 seconds. 8 Run task 2 (17008)... 9 Task 2 runs 0.23 seconds. 10 Run task 3 (11900)... 11 Task 3 runs 1.07 seconds. 12 Run task 4 (6152)... 13 Task 4 runs 2.02 seconds. 14 Waiting for all subprocesses done... 15 All subprocesses done.
3.1 进程间通信
多进程之间的通信通过Queue()或者Pipe()来实现。
3.1.1 Queue()
通过Queue()来通信实际上就是通过一个队列来进行数据的传递和获取,不同的进程操作同一个队列。
1 from multiprocessing import Process, Queue 2 import os, time, random 3 4 # 写数据进行执行的代码 5 def write(q): 6 print('Process to write: %s' % os.getpid()) 7 for value in ['A', 'B', 'C']: 8 print('Put %s to queue...' % value) 9 q.put(value) 10 time.sleep(random.random()) 11 12 # 读进程执行的代码 13 def read(q): 14 print('Process to read: %s' % os.getpid()) 15 while True: 16 value = q.get(True) 17 print('Get %s from queue.' % value) 18 19 if __name__ == '__main__': 20 # 父进程创建Queue,并传给各个子进程 21 q = Queue() 22 pw = Process(target = write(q)) 23 pr = Process(target = read(q)) 24 25 # 启动子进程pw,写入: 26 pw.start() 27 # 启动子进程pr,读取: 28 pr.start() 29 # 等待pw结束 30 pw.join() 31 # pr进程里是死循环,无法等待其结束,只能强行终止 32 pr.terminate()
3.1.2 Pipe()
Pipe即管道模式,调用Pipe()返回管道两端的Connection。因此Pipe仅仅适用于只有两个进程一读一写的情况。
进程间的Pipe基于fork机制建立,当主进程创建Pipe的时候,Pipe的两个Connections连接的都是主进程
当主进程创建子进程后,Connections也被拷贝了一份,此时有了4个Connections,此后,关闭主进程的一个