通过Pool类批量创建进程
若创建的进程数量不多,可以直接使用Process类创建多个子进程。但是有时需要操作多个文件目录,或者远程控制多台计算机,这时对进程数量的需求会非常大,手动地创建多个进程的方式显然是不可取的,不仅低效繁琐,而且工作量巨大。因此,多进程模块multiprocessing中提供了Pool(进程池)类,可以批量创建子进程。
通过Pool类的构造方法可以创建一个进程池,该方法的声明如下:
Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)
以上方法中常用参数的含义如下:
processes,表示进程的数量。若processes参数设为None,则会使用os.cpu_count()返回的数量。
maxtasksperchild,进程退出之前可以完成的任务数量,完成之后使用新的进程替换原进程,以释放闲置资源。
context,用于设定工作进程启动时的上下文。
使用Pool类的构造方法批量创建5个子进程,示例代码如下:
from multiprocessing import Pool
pool = Pool(processes=5)
进程池的内部维护了一个进程序列。当使用进程池中的进程执行任务时,如果没有达到进程池中的进程数量的最大值,那么会创建一个新的进程来执行任务;如果进程池中没有可供使用的进程,那么程序会等待,直到进程池中有可用的进程为止。
Pool类中提供了一些操作进程池的方法,关于这些方法说明如表1所示。
表1 Pool类的常见方法
方法名 | 说明 |
---|---|
apply_async() | 非阻塞式地给进程池添加任务 |
apply() | 阻塞式地给进程池添加任务 |
close() | 关闭进程池,阻止更多的任务提交到进程,待所有任务执行完成后进程会退出 |
terminate() | 结束进程,不再处理未完成的任务 |
join() | 等待进程的退出,必须在close()或terminate()之后使用 |
表1中的前两个为进程池添加任务的方法的不同之处在于:apply_async()在进程池创建完进程后立刻返回,不会等到进程执行结束;apply()在创建完进程、且进程中所有的任务执行完毕后才返回。
下面针对这两种任务添加的方式分别介绍。
1. 进程池非阻塞式添加任务
apply_async()方法的声明如下:
apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None)
以上方法中常用参数的含义如下:
func,表示函数名称。
args和kwds,表示提供给func函数的参数。
callback,表示回调函数。
error_callback,表示程序执行失败后会调用的函数。
接下来,通过一个案例来演示如何非阻塞式地往进程池中添加任务,代码如下。
1 from multiprocessing import Pool
2 import time
3 import os
4 def work(num):
5 print('进程%s:执行任务%d'% (os.getpid(), num))
6 time.sleep(2)
7 if __name__ == '__main__':
8 pool = Pool(3) # 创建进程池,指定最大进程数量为3
9 for i in range(9):
10 pool.apply_async(work, (i,)) # 进程池添加、执行任务
11 time.sleep(3)
12 print('主进程执行结束')
以上代码中,第4~6行定义了一个进程池待执行的任务函数work(),该函数内部调用sleep()函数休眠两秒钟;第7~12行是程序启动会执行的代码,其中,第8行创建了一个具有3个工作进程的进程池,第9~10行在进程池中添加了9个任务,第11行调用sleep()函数让主进程休眠3秒钟,第12行用“主进程执行结束”提示程序结束。
程序的一次执行结果如下:
进程6956: 执行任务0
进程6776: 执行任务1
进程5076: 执行任务2
进程6956: 执行任务3
进程6776: 执行任务4
进程5076: 执行任务5
主进程执行结束
由以上结果可知,主进程在三个子进程6956、6776和5076执行了6个任务后退出。
若希望主进程能等待所有的子进程执行完之后结束,需要通过join()方法将主进程切换成阻塞状态。在上述示例中main语句的末尾增加以下语句:
pool.close() # 关闭进程池
pool.join() # 阻塞主进程
再次运行程序,程序本次的执行结果如下:
进程5228: 执行任务0
进程5144: 执行任务1
进程4776: 执行任务2
进程5228: 执行任务3
进程5144: 执行任务4
进程4776: 执行任务5
主进程执行结束
进程5144: 执行任务6
进程5228: 执行任务7
进程4776: 执行任务8
由以上结果可知,主进程在执行完打印语句之后,并没有直接退出程序,而是等子进程执行完所有的任务之后才退出。
2. 进程池阻塞式添加任务
apply ()方法的声明如下:
apply(self, func, args=(), kwds={})
以上方法的参数与apply_async()方法的参数含义相同,此处不再赘述。
接下来,通过一个案例来演示如何阻塞式地往进程池中添加任务,代码如下。
from multiprocessing import Pool
import time
import os
def work(num):
print('进程%s: 执行任务%d'% (os.getpid(), num))
time.sleep(2)
if __ name__ == ' __ main__':
pool = Pool(3) # 创建进程池,指定最大进程数量为3
for i in range(9):
pool.apply(work, (i,)) # 进程池添加、执行任务
time.sleep(3)
print('主进程执行结束')
运行程序,控制台每隔两秒钟打印一条语句,直至打印完所有的语句为止。程序执行的一次结果如下:
进程5928: 执行任务0
进程6408: 执行任务1
进程5840: 执行任务2
进程5928: 执行任务3
进程6408: 执行任务4
进程5840: 执行任务5
进程5928: 执行任务6
进程6408: 执行任务7
进程5840: 执行任务8
主进程执行结束
由以上结果可知,主进程在子进程全部执行完毕后才会退出。