concurrent.futures.Executor类有子类ThreadPoolExecutor()和ProcessPoolExecutor()分别可实现多线程和多进程加速。 由于这部分内容过于重要,我不得不把它们从
Python实录01 -- 汇总一些小知识
中拿出来单独讲。
python3中concurrent.futures是标准库,在python2需要自己pip install futures。
使用futures的多线程/多进程处理需要从futures导入线程池处理器和进程池处理器:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
线程池处理器ThreadPoolExecutor( int max_workers)
用于多线程处理的类ThreadPoolExecutor
继承自父类Executor
,用于创建线程池,可以接受一个整型参数max_workers
,代表创建的最大线程数。参数可以为空,为空则自动创建基于机器cpu核心数的进程数。
Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。比如在给图片添加噪声的代码中,定义main( imgIn)
函数为随机执行五种添加噪声的函数:
def main(imgIn):
if-else:
... ...
return imgOut
于是可以:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(main, img)
# submit(func, argvs[])方法接收第一个参数为需要调用的方法(名),
# 其后的可选参数为func需要传入接收的形参。
future
能够使用done()
方法判断该任务是否结束,done()
方法是不阻塞的;使用result()
方法可以获取任务的返回值,这个方法是阻塞的。以上一句话不重要,重要的是submit(func, argvs[])
方法只能进行单个任务,常用的并发多个任务,需要使用map
与as_completed
。
map方法
依然以数据比赛中图片添加噪声部分为例:
if __name__ == "__main__":
img_root = "./data/images/train/"
txt_root = "./data/labels/train/"
img_list = os.listdir(img_root)
# version within zip
with concurrent.futures.ThreadPoolExecutor(max_workers) as executor:
for img, info in zip(img_list, executor.map(func, img_list)):
print(info)
# version without zip
with concurrent.futures.ThreadPoolExecutor(max_workers) as executor:
executor.map(func, img_list)
这里有三个需要注意的点:
map
方法第二个参数应当是列表list
,列表的长度是func
被调用的次数,列表的item是每次加入到func
的参数。特别的,在map中加入到func
的参数只能是单个,但item可以是高维的。- 虽然不是必须,Executor.map()方法往往与zip打包函数配合使用,这样便于打印信息。zip的使用参照Python实录01。
- 可以任意指定的
max_workers
,但建议使之不大于cpu核数。
进程池处理器ProcessPoolExecutor( int max_workers)
调用方法和ThreadPoolExecutor()一致,需要注意的有两点:
- 调用过程一定要在
__name__=="__main__:"
内部。 - 对于CPU频繁的操作(方法、函数),ProcessPoolExecutor的效率更高。这是由于GIL锁的存在使得多个线程只能调用一个cpu,GIL锁此处不展开。