学科分类
目录

通过Pool类批量创建进程

若创建的进程数量不多,可以直接使用Process类创建多个子进程。但是有时需要操作多个文件目录,或者远程控制多台计算机,这时对进程数量的需求会非常大,手动地创建多个进程的方式显然是不可取的,不仅低效繁琐,而且工作量巨大。因此,多进程模块multiprocessing中提供了Pool(进程池)类,可以批量创建子进程。

通过Pool类的构造方法可以创建一个进程池,该方法的声明如下:

Pool(processes=None, initializer=None, initargs=(),  maxtasksperchild=None, context=None)

以上方法中常用参数的含义如下:

  • processes,表示进程的数量。若processes参数设为None,则会使用os.cpu_count()返回的数量。

  • maxtasksperchild,进程退出之前可以完成的任务数量,完成之后使用新的进程替换原进程,以释放闲置资源。

  • context,用于设定工作进程启动时的上下文。

使用Pool类的构造方法批量创建5个子进程,示例代码如下:

from multiprocessing import Pool
pool = Pool(processes=5)

进程池的内部维护了一个进程序列。当使用进程池中的进程执行任务时,如果没有达到进程池中的进程数量的最大值,那么会创建一个新的进程来执行任务;如果进程池中没有可供使用的进程,那么程序会等待,直到进程池中有可用的进程为止。

Pool类中提供了一些操作进程池的方法,关于这些方法说明如表1所示。

表1 Pool类的常见方法

方法名 说明
apply_async() 非阻塞式地给进程池添加任务
apply() 阻塞式地给进程池添加任务
close() 关闭进程池,阻止更多的任务提交到进程,待所有任务执行完成后进程会退出
terminate() 结束进程,不再处理未完成的任务
join() 等待进程的退出,必须在close()或terminate()之后使用

表1中的前两个为进程池添加任务的方法的不同之处在于:apply_async()在进程池创建完进程后立刻返回,不会等到进程执行结束;apply()在创建完进程、且进程中所有的任务执行完毕后才返回。

下面针对这两种任务添加的方式分别介绍。

1. 进程池非阻塞式添加任务

apply_async()方法的声明如下:

apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None)

以上方法中常用参数的含义如下:

  • func,表示函数名称。

  • args和kwds,表示提供给func函数的参数。

  • callback,表示回调函数。

  • error_callback,表示程序执行失败后会调用的函数。

接下来,通过一个案例来演示如何非阻塞式地往进程池中添加任务,代码如下。

 1  from multiprocessing import Pool
 2  import time
 3  import os
 4  def work(num):
 5    print('进程%s:执行任务%d'% (os.getpid(), num))
 6    time.sleep(2)
 7  if __name__ == '__main__':
 8    pool = Pool(3)             # 创建进程池,指定最大进程数量为3
 9    for i in range(9):
 10     pool.apply_async(work, (i,))  # 进程池添加、执行任务
 11   time.sleep(3)
 12   print('主进程执行结束')

以上代码中,第4~6行定义了一个进程池待执行的任务函数work(),该函数内部调用sleep()函数休眠两秒钟;第7~12行是程序启动会执行的代码,其中,第8行创建了一个具有3个工作进程的进程池,第9~10行在进程池中添加了9个任务,第11行调用sleep()函数让主进程休眠3秒钟,第12行用“主进程执行结束”提示程序结束。

程序的一次执行结果如下:

进程6956: 执行任务0
进程6776: 执行任务1
进程5076: 执行任务2
进程6956: 执行任务3
进程6776: 执行任务4
进程5076: 执行任务5
主进程执行结束

由以上结果可知,主进程在三个子进程6956、6776和5076执行了6个任务后退出。

若希望主进程能等待所有的子进程执行完之后结束,需要通过join()方法将主进程切换成阻塞状态。在上述示例中main语句的末尾增加以下语句:

pool.close()      # 关闭进程池
pool.join()       # 阻塞主进程

再次运行程序,程序本次的执行结果如下:

进程5228: 执行任务0
进程5144: 执行任务1
进程4776: 执行任务2
进程5228: 执行任务3
进程5144: 执行任务4
进程4776: 执行任务5
主进程执行结束
进程5144: 执行任务6
进程5228: 执行任务7
进程4776: 执行任务8

由以上结果可知,主进程在执行完打印语句之后,并没有直接退出程序,而是等子进程执行完所有的任务之后才退出。

2. 进程池阻塞式添加任务

apply ()方法的声明如下:

apply(self, func, args=(), kwds={})

以上方法的参数与apply_async()方法的参数含义相同,此处不再赘述。

接下来,通过一个案例来演示如何阻塞式地往进程池中添加任务,代码如下。

from multiprocessing import Pool
import time
import os
def work(num):
  print('进程%s: 执行任务%d'% (os.getpid(), num))
  time.sleep(2)
if __ name__ == ' __ main__':
  pool = Pool(3)           # 创建进程池,指定最大进程数量为3
  for i in range(9):
     pool.apply(work, (i,))    # 进程池添加、执行任务
  time.sleep(3)
  print('主进程执行结束')

运行程序,控制台每隔两秒钟打印一条语句,直至打印完所有的语句为止。程序执行的一次结果如下:

进程5928: 执行任务0
进程6408: 执行任务1
进程5840: 执行任务2
进程5928: 执行任务3
进程6408: 执行任务4
进程5840: 执行任务5
进程5928: 执行任务6
进程6408: 执行任务7
进程5840: 执行任务8
主进程执行结束

由以上结果可知,主进程在子进程全部执行完毕后才会退出。

点击此处
隐藏目录