Timer定时器

Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。

构造方法:
Timer(interval, function, args=[], kwargs={})
interval: 指定的时间
function: 要执行的方法
args/kwargs: 方法的参数

实例方法:
Timer从Thread派生,没有增加实例方法。

例1,1秒后,打印hello, world

1
2
3
4
5
6
from threading import Timer
def hello():
    print("hello, world")

t = Timer(1, hello)
t.start() # after 1 seconds, "hello, world" will be printed

例2,验证码定时器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from threading import Timer
import random,time
class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self,interval=5):
        self.cache=self.make_code()
        print(self.cache)
        self.t=Timer(interval,self.make_cache)
        self.t.start()

    def make_code(self,n=4):
        res=''
        for i in range(n):
            s1=str(random.randint(0,9))
            s2=chr(random.randint(65,90))
            res+=random.choice([s1,s2])
        return res

    def check(self):
        while True:
        inp=input('>>: ').strip()
            if inp.upper() == self.cache:
                print('验证成功',end='\n')
            self.t.cancel()
            break

if __name__ == '__main__':
    obj=Code()
    obj.check()

Semaphore类

信号量对象。信号量管理一个内置的原子计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

构造函数
Semaphore(value=1)
可选参数提供内部计数器的初始值;默认值为1。如果给定的值小于0,则会引发ValueError。

实例方法
acquire(blocking=True, timeout=None) :
获取一个信号量

release():
释放一个一个信号量

实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from threading import Thread,Semaphore
import threading
import time
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()

if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

执行以上程序会输出如下结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Thread-1 get sm
Thread-2 get sm
Thread-3 get sm
Thread-5 get sm
Thread-4 get sm
Thread-6 get sm
Thread-8 get sm
Thread-9 get sm
Thread-10 get sm
Thread-7 get sm
Thread-11 get sm
Thread-13 get sm
Thread-15 get sm
Thread-14 get sm
Thread-12 get sm
Thread-16 get sm
Thread-17 get sm
Thread-18 get sm
Thread-20 get sm
Thread-19 get sm
Thread-21 get sm
Thread-22 get sm
Thread-23 get sm

与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程,最多只能有信号定义的线程/进程能获得数据运行。

local类

在多线程的模式下,全局变量对于每一个线程都是可见的,并且可以修改,有的时候我们希望线程使用自己的局部变量而不是使用全局变量,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。threading.local()方法来实现上面的需求。

local是一个小写字母开头的类,用于管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。

可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。

例子1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import threading
# 创建全局local对象:
local_school = threading.local()
def process_student():
    #获取当前线程关联的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(stuname):
    # 绑定ThreadLocal的student:
    local_school.student = stuname
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

执行以上程序会输出如下结果:

1
2
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

例子2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import threading

local = threading.local()
local.tname = 'main'

def func():
    #子线程的tname变量
    local.tname = 'notmain'
    print(local.tname)

t1 = threading.Thread(target=func)
t1.start()
t1.join()

print(local.tname)  #主线程的tname变量

执行以上程序会输出如下结果:

1
2
notmain
main

ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以访问这些资源。

Barrier

Barrier创建一个”阻碍”,必须达到指定数量的线程后才可以继续。也叫屏障。可以想象成路障、道闸。Python 3.2引入的新功能。

构造方法:
Barrier(parties, action=None, timeout=None)
构建Barrier对象,parties指定通过栅栏所需的线程数,timeout是wait方法未指定时超时的默认值。

线程queue

使用队列进行线程同步,不需要关心锁的问题,因为queue已经帮我们实现了.

先进先出队列

1
2
3
4
5
6
7
8
9
# 先进先出
import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())

执行以上程序会输出如下结果:

1
2
3
first
second
third

下面使用队列重写生产者消费者模型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading
import queue
import random
import time

class Producer(threading.Thread):
    def __init__(self, queue):
        super(Producer,self).__init__()
        self.queue = queue
    def run(self):
        while True:
            integer = random.randint(0, 1000)
            self.queue.put(integer)
            print('%d put to queue by %s' % (integer, self.name))
            time.sleep(1)

class Consumer(threading.Thread):
    def __init__(self, queue):
        super(Consumer, self).__init__()
        self.queue = queue
    def run(self):
        while True:
            integer = self.queue.get()
            print('%d popped from list by %s' % (integer, self.name))
        self.queue.task_done()

if __name__ == '__main__':
    queue = queue.Queue()
    t1 = Producer(queue)
    t2 = Consumer(queue)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

后进先出队列

1
2
3
4
5
6
7
8
9
# 后进先出
import queue
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())

执行以上程序会输出如下结果:

1
2
3
third
second
first

class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

1
2
3
4
5
6
7
8
9
import queue
q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
print(q.get())
print(q.get())
print(q.get())

执行以上程序会输出如下结果:

1
2
3
(10, 'b')
(20, 'a')
(30, 'c')

Python标准模块–concurrent.futures

1 介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用

2 基本方法 submit(fn, *args, **kwargs)
异步提交任务

map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作

shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前

result(timeout=None)
取得结果

add_done_callback(fn)
回调函数

八、同步锁 2个需要注意的点:

  1. 线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来
  2. join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高。

GIL VS Lock
机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?

首先我们需要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据。然后,我们可以得出结论:保护不同的数据就应该加不同的锁。
 最后,问题就很明朗了,GIL与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock。 过程分析:所有线程抢的是GIL锁,或者说所有线程抢的是执行权限。线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock。。。这就导致了串行运行的效果。 既然是串行,那我们执行

1
2
3
4
  t1.start()
  t1.join()
  t2.start()
  t2.join()

这也是串行执行啊,为何还要加Lock呢,需知join是等待t1所有的代码执行完,相当于锁住了t1的所有代码,而Lock只是锁住一部分操作共享数据的代码。   因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序里的线程和py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,这可以说是Python早期版本的遗留问题。


转载请注明本网址。