Lock类、Rlock类

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。

先看下不加锁的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from threading import Thread
import time
class MyThread(Thread):
    def __init__(self, threadname):
        Thread.__init__(self, name=threadname)

    def run(self):
        global n
        temp=n
        time.sleep(0.1)
        n=temp-1

n=100
my_thread_list = []
for i in range(100):
    my_thread = MyThread(i)
    my_thread_list.append(my_thread)
    my_thread.start()

for t in my_thread_list:
t.join()

print(n) #因为没有对共享资料锁定,结果不是0

Lock通常被用来实现对共享资源的同步访问。为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:
acquire([timeout]): 尝试获得锁定。使线程进入同步阻塞状态。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from threading import Thread,Lock
import time
class MyThread(Thread):
    def __init__(self, threadname):
        Thread.__init__(self, name=threadname)

    def run(self):
        lock.acquire()
        global n
        temp=n
        time.sleep(0.1)
        n=temp-1
        lock.release()

lock=Lock()
n=100
my_thread_list = []
for i in range(100):
    my_thread = MyThread(i)
    my_thread_list.append(my_thread)
    my_thread.start()

for t in my_thread_list:
    t.join()

print(n) #结果肯定为0。锁定了共享资源。由原来的并发执行变成串行,牺牲了执行效率保证了数据安全

上面例子中,同时启动了100个线程,但是有且只能有一个线程获取lock锁(也就是执行lock.acquire()语句),对共享资源进行操作。操作完毕后释放锁(也就是执行lock.release()语句),其他的线程才能去争抢锁。得到锁的线程操作共享资源,以此类推。知道100个线程执行完毕。此例中的共享资源是下面的语句块。

1
2
3
4
global n
temp=n
time.sleep(0.1)
n=temp-1

再进一步来看,程序开始时同时启动了100个线程,100个线程同时运行并调用run方法,这时是并发执行的。线程之间相互不影响。执行到锁的时候,只能有一个线程获取锁并操作共享资源,剩余的99个线程只能等待(没有其余的可执行代码),这时由100并发执行变成1个串行。所以对于锁定的共享资源操作来说是串行处理的。

死锁现象

所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from threading import Thread,Lock
import time

mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
        time.sleep(5)

    def func1(self):
        mutexA.acquire()
        print('%s 拿到A锁' %self.name)
        mutexB.acquire()
        print('%s 拿到B锁' %self.name)
        mutexB.release()
        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('%s 拿到B锁' %self.name)
        time.sleep(2)
        mutexA.acquire()
        print('\%s 拿到A锁' %self.name)
        mutexA.release()
        mutexB.release()

for i in range(10):
    t=MyThread()
    t.start()

执行以上程序会输出如下结果:

1
2
3
4
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁

然后就卡住,死锁了。

原因是Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。
解决方法是RLock(可重入锁)。RLock是一个可以被同一个线程请求多次的同步指令。RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。上面的例子如果使用RLock代替Lock,则不会发生死锁:

与Lock的区别是:

  1. 从所属上,Lock属于全局,不被任何一个线程所拥有。Rlock一旦被某一个线程所获得,那么直到该锁被释放,都属于该线程。
  2. 从使用上,Lock只能被获得一次,然后释放,多次获得将会出现死锁。Rlock可以被多次获得,但是需要注意的是,获得多少次,就必须释放相对应的次数,Rlock有一个计数器,每获得一次,计数加一,每释放一次,计数减一,计数为0时,该锁属于释放状态。

简言之:Lock属于全局,Rlock属于线程。

mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止.

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from threading import Thread,Lock
import time

mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
        time.sleep(5)

    def func1(self):
        mutexA.acquire()
        print('%s 拿到A锁' %self.name)
        mutexB.acquire()
        print('%s 拿到B锁' %self.name)
        mutexB.release()
        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('%s 拿到B锁' %self.name)
        time.sleep(2)
        mutexA.acquire()
        print('\%s 拿到A锁' %self.name)
        mutexA.release()
        mutexB.release()

for i in range(10):
    t=MyThread()
    t.start()

Condition类

.条件(condition)
条件同步机制:顾名思义,一个线程等待某个特定条件,而另一个线程发出满足特定条件的信号。

Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

构造方法:
Condition([lock/rlock])

实例方法:
acquire([timeout])/release():
调用关联的锁的相应方法。

wait([timeout]):
调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

notifyAll():
调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

下面使用条件同步机制来实现生产者消费者模型。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import threading
import random
import time
class Producer(threading.Thread):
    def __init__(self, integers, condition):
        super(Producer,self).__init__()
        self.integers = integers
        self.condition = condition

    def run(self):
        while True:
            integer = random.randint(0, 1000)
            self.condition.acquire()  #获取条件锁
            print('condition acquired by %s' % threading.current_thread().name)
            self.integers.append(integer)
            print('%d appended to list by %s' % (integer, threading.current_thread().name))
            print('condition notified by %s' % threading.current_thread().name)
            self.condition.notify()   #唤醒消费者线程
            print('condition released by %s' % self.name)
            self.condition.release()  #释放条件锁
            time.sleep(1)

class Consumer(threading.Thread):
    def __init__(self, integers, condition):
        super(Consumer,self).__init__()
        self.integers = integers
        self.condition = condition

    def run(self):
        while True:
            self.condition.acquire()  #获取条件锁
            print('condition acquired by %s' % self.name)
            while True:
                if self.integers:
                    integer = self.integers.pop()
                    print('%d popped from list by %s' % (integer, self.name))
                    break
                print('condition wait by %s' % self.name)
                self.condition.wait()   #等待状态,等待被唤醒,才会继续执行
            print('condition released by %s' % self.name)
            self.condition.release()  #最后释放条件锁

if __name__ == '__main__':
    integers = []
    condition = threading.Condition()
    t1 = Producer(integers, condition)
    t2 = Consumer(integers, condition)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

执行以上程序会输出如下结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
condition acquired by Thread-1
600 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
condition acquired by Thread-2
600 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
condition acquired by Thread-1
732 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
732 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
condition acquired by Thread-1
……
……

Event类

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

构造方法:
Event()

实例方法:
isSet():
返回event的状态值,True或者False;

wait():
如果event.isSet()==False将阻塞线程;

set():
设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

clear():
恢复event的状态值为False。

例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from threading import Thread,Event
import threading
import time,random

def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
        raise TimeoutError('链接超时')
        print('第%s次尝试链接' % (count))
        event.wait(0.5)
        count+=1
    print('<%s>链接成功' %threading.current_thread().getName())

def check_mysql():
    print('[%s]正在检查mysql' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()

if __name__ == '__main__':
event=Event()
conn1=Thread(target=conn_mysql)
conn2=Thread(target=conn_mysql)
check=Thread(target=check_mysql)
conn1.start()
conn2.start()
check.start()

转载请注明本网址。