一、概述
Python既支持多线程,又支持多进程。python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。
二、多进程模块介绍
Python提供了非常好用的多进程包multiprocessing。multiprocessing模块就是跨平台多进程模块。只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步。
三、multiprocessing模块
multiprocessing模块提供的类:
Process、Queue、Pipe、Lock等组件。
multiprocessing模块提供的常用方法:
multiprocessing.current_process(): 返回当前的进程变量。
四、开启进程的两种方式
有函数和类两种方式创建多进程。
函数方式创建多进程
启动一个进程就是把一个函数传入并创建Process实例,然后调用start()开始执行。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import multiprocessing
import time
def worker(sec):
time.sleep(sec)
print('这是进程的名字', multiprocessing.current_process().name)
time.sleep(sec)
if __name__ == '__main__':
print('这是主进程:', multiprocessing.current_process().name)
s_time = time.time()
p1 = multiprocessing.Process(target=worker, args=(2,))
p2 = multiprocessing.Process(target=worker, args=(2,))
p1.start()
p2.start()
p1.join()
p2.join()
print('主进程结束', multiprocessing.current_process().name)
print('一共用时', time.time()-s_time)
|
执行以上程序会输出如下结果:
1
2
3
4
5
|
这是主进程: MainProcess
这是进程的名字 Process-2
这是进程的名字 Process-1
主进程结束 MainProcess
一共用时 4.040495157241821
|
创建多进程时,只需要传入一个执行函数和函数的参数,start()启动,join()可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
类方式创建多进程
使用类方式需要写一个类,继承自multiprocessing.Thread类,然后重写run()方法。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import multiprocessing
import time
class Worker(multiprocessing.Process):
def __init__(self, sec):
super(Worker, self).__init__()
self.sec = sec
def run(self):
time.sleep(self.sec)
print('这是进程的名字', multiprocessing.current_process().name)
time.sleep(self.sec)
if __name__ == '__main__':
print('主进程开始:', multiprocessing.current_process().name)
s_time = time.time()
pro_list = []
for i in range(4):
worker = Worker(2)
pro_list.append(worker)
for p in pro_list:
p.start()
for p in pro_list:
p.join()
print('主进程结束', multiprocessing.current_process().name)
print('一共用时', time.time()-s_time)
|
执行以上程序会输出如下结果:
1
2
3
4
5
6
7
|
主进程开始: MainProcess
这是进程的名字 Worker-1
这是进程的名字 Worker-2
这是进程的名字 Worker-3
这是进程的名字 Worker-4
主进程结束 MainProcess
一共用时 4.018951416015625
|
使用以上两种方式创建多进程时,join()方法与多线程效果相同,多线程设置守护线程的命令是t.setDaemon(True), 而多进程设置守护进程的命令是p.daemon = True
创建进程的类:
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)。
强调:
1.需要使用关键字的方式来指定参数
2.args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数:
・group参数未使用,值始终为None
・target表示调用对象,即子进程要执行的任务
・args表示调用对象的位置参数元组,args=(1,2,‘egon’,)
・kwargs表示调用对象的字典,kwargs={‘name’:‘egon’,‘age’:18}
・name为子进程的名称
五、Process实例对象的方法属性
start():启动进程,并调用该子进程中的run()。
run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法。
is_alive():如果p仍然运行,返回True。
Process实例对象的属性
・p.name:进程的名称
・p.pid:进程的pid
・p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束
・p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, processname):
Process.__init__(self, name=processname)
def run(self):
print('修改前进程名字:',self.name)
self.name = "ChildProcess1"
print('修改后进程名字:',self.name)
print('进程PID:',self.pid)
print('进程exitcode:',self.exitcode)
time.sleep(5)
print('%s运行结束'%self.name)
#建立子进程t
t=MyProcess('子进程')
#启动进程
t.start()
#判断进程是否活动的。
if t.is_alive():
print("子进程运行中")
else:
print("子进程运行结束")
print("主进程运行结束")
|
执行以上程序会输出如下结果:
1
2
3
4
5
6
7
|
子进程运行中
主进程运行结束
修改前进程名字: 子进程
修改后进程名字: ChildProcess1
进程PID: 6
进程exitcode: None
ChildProcess1运行结束
|
守护进程,表示该进程是不重要的,主进程退出时不需要等待这个进程执行完成。
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程。守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意,需要在进程start之前设置p.daemon,否则会抛出RuntimeError异常。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, processname):
Process.__init__(self, name=processname)
def run(self):
time.sleep(5)
print('%s运行结束'%self.name)
#建立子进程t
t=MyProcess('子进程')
#启动进程
t.start()
#判断进程是否活动的。
if t.is_alive():
print("子进程运行中")
else:
print("子进程运行结束")
print("主进程运行结束")
|
执行以上程序会输出如下结果:
1
2
3
|
子进程运行中
主进程运行结束
子进程运行结束
|
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, processname):
Process.__init__(self, name=processname)
def run(self):
time.sleep(5)
print('%s运行结束'%self.name)
#建立子进程t
t=MyProcess('子进程')
t.daemon =True #设置守护进程,主进程结束时,强制子进程结束
#启动进程
t.start()
#判断进程是否活动的。
if t.is_alive():
print("子进程运行中")
else:
print("子进程运行结束")
print("主进程运行结束")
|
执行以上程序会输出如下结果:
join([timeout]):主进程等待p终止(强调:是主进程处于等的状态,而子进程是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程。
例如,没有join处理的时候
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, processname):
Process.__init__(self, name=processname)
def run(self):
time.sleep(5)
print('%s运行结束'%self.name)
#建立子进程t
t=MyProcess('子进程')
#启动进程
t.start()
print("主进程在等待子进程")
print("主进程运行结束")
|
执行以上程序会输出如下结果:
1
2
3
|
主进程在等待子进程
主进程运行结束
子进程运行结束
|
例如,有join处理的时候
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, processname):
Process.__init__(self, name=processname)
def run(self):
time.sleep(5)
print('%s运行结束'%self.name)
#建立子进程t
t=MyProcess('子进程')
#启动进程
t.start()
print("主进程在等待子进程")
t.join()
print("主进程运行结束")
|
执行以上程序会输出如下结果:
1
2
3
|
主进程在等待子进程
子进程运行结束
主进程运行结束
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
from multiprocessing import Process
import time
import random
class MyProcess(Process):
def __init__(self, processname):
Process.__init__(self, name=processname)
def run(self):
time.sleep(5)
print('%s运行结束'%self.name)
p1=MyProcess('子进程1')
p2=MyProcess('子进程2')
p3=MyProcess('子进程3')
p4=MyProcess('子进程4')
p1.start()
p2.start()
p3.start()
p4.start()
#有的同学会有疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?当然不是了,必须明确:p.join()是让谁等?很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p。
#详细解析如下:
#进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了,而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键。join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其p2,p3,p4仍然在运行,等p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待。所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
p1.join()
p2.join()
p3.join()
p4.join()
print('主线程')
#上述启动进程与join进程可以简写为
# p_l=[p1,p2,p3,p4]
# for p in p_l:
# p.start()
# for p in p_l:
# p.join()
|
执行以上程序会输出如下结果:
1
2
3
4
5
|
子进程2运行结束
子进程3运行结束
子进程1运行结束
子进程4运行结束
主线程
|
join有一个timeout参数:
当设置守护进程时(也就是p.daemon=true),含义是主进程对于子进程等待timeout的时间将会杀死该子进程,最后退出程序。所以说,如果有10个子进程,全部的等待时间就是每个timeout的累加和。简单的来说,就是给每个子进程一个timeout的时间,让他去执行,时间一到,不管任务有没有完成,直接杀死。
没有设置守护进程时(也就是p.daemon=false),主进程将会等待timeout的累加和这样的一段时间,时间一到,主进程结束,但是并没有杀死子进程,子进程依然可以继续执行,直到子进程全部结束,程序退出。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, processname):
Process.__init__(self, name=processname)
def run(self):
time.sleep(5)
print('%s运行结束'%self.name)
my_process_list = []
for i in range(5):
my_process = MyProcess(i)
my_process_list.append(my_process)
for i in my_process_list:
i.start()
print("主进程在等待子进程")
#子进程阻塞主进程。子进程运行完毕后,再继续运行主进程
for i in my_process_list:
i.join(1)
print("主进程运行结束")
|
执行以上程序会输出如下结果:
1
2
3
4
5
6
7
|
主进程在等待子进程
MyProcess-1运行结束
1运行结束
3运行结束
2运行结束
4运行结束
主进程运行结束
|
转载请注明本网址。