asyncio
asyncio是用来编写并发代码的库,使用async/await语法。被用作多个提供高性能Python异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。是构建IO密集型和高层级结构化网络代码的最佳选择。
asyncio提供一些低层级API以支持库和框架的开发者实现,本节先介绍几个概念。
事件循环event loop
程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
以下低级函数可用于获取、设置或创建事件循环:
asyncio.get_running_loop()
获取正在运行的事件循环。如果没有正运行的事件循环,抛出RuntimeError异常。此函数只能被协程对象或者回调对象调用。
asyncio.get_event_loop()
获取当前事件循环。如果当前OS线程中没有设置当前事件循环,并且尚未调用set_event_loop(),asyncio将创建一个新的事件循环并将其设置为当前事件循环。
由于此函数具有相当复杂的行为(特别是在使用自定义事件循环策略时),因此最好使用get_running_loop()函数在协程和回调中get_event_loop()。
还可以考虑使用asyncio.run()函数,而不是使用较低级别的函数手动创建和关闭事件循环。
asyncio.set_event_loop(loop)
将loop设置为当前OS线程的当前事件循环。
asyncio.new_event_loop()
创建新的事件循环。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import time
import asyncio
#定义协程函数对象
async def taskFunc(x):
print('Waiting: ', x)
await asyncio.sleep(x)
#创建协程对象coroutine
coroutine = taskFunc(2)
#创建事件循环loop
loop = asyncio.get_event_loop()
#协程对象coroutine注册到事件循环
loop.run_until_complete(coroutine)
#打印事件循环的类型
print(type(loop))
#打印事件循环对象
print(loop)
|
执行以上程序会输出如下结果:
1
2
3
|
Waiting: 2
<class 'asyncio.unix_events._UnixSelectorEventLoop'>
<_UnixSelectorEventLoop running=False closed=False debug=False>
|
可等待对象awaitable
await用于挂起阻塞的异步调用接口。使用了await语句的对象就是awaitable。主要有三种awaitable对象:协程coroutines, 任务Tasks和未来Futures。
阻塞和await
使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行。
耗时的操作一般是一些IO操作,例如网络请求,文件读取等。我们使用asyncio.sleep函数来模拟IO操作。协程的目的也是让这些IO操作异步化。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import asyncio
import time
async def task(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
#创建协程对象coroutine
coroutine = task(2)
#创建事件循环loop
loop = asyncio.get_event_loop()
#使用协程对象coroutine来创建task
task = asyncio.ensure_future(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print('Task result: ', task.result())
|
执行以上程序会输出如下结果:
1
2
3
4
|
<Task pending coro=<task() running at main.py:3>>
Waiting: 2
<Task finished coro=<task() done, defined at main.py:3> result='Done after 2s'>
Task result: Done after 2s
|
在sleep的时候,使用await让出控制权。即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其他的协程。睡眠x秒后,协程task函数再一次获得控制权,并返回字符串Task result: Done after 2s 。
coroutine协程
通过async关键字定义一个协程(coroutine),协程也是一种对象。协程不能直接运行,需要把协程加入到事件循环(loop),由后者在适当的时候调用协程。asyncio.get_event_loop方法可以获取一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import time
import asyncio
#定义协程函数对象
async def taskFunc(x):
print('Waiting: ', x)
await asyncio.sleep(x)
#创建协程对象coroutine
coroutine = taskFunc(2)
#打印协程的类型
print(type(coroutine))
#打印协程对象
print(coroutine)
#创建事件循环loop
loop = asyncio.get_event_loop()
#协程对象coroutine注册到事件循环
loop.run_until_complete(coroutine)
|
执行以上程序会输出如下结果:
1
2
3
|
<class 'coroutine'>
<coroutine object taskFunc at 0x7f55e51e6830>
Waiting: 2
|
运行协程异步,有三种方法:
方法一,使用asyncio.run(func)函数去调用函数func。
例如:
1
2
3
4
5
6
7
8
9
10
11
|
import asyncio
#定义协程要运行的函数
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
#运行协程
asyncio.run(main())
#打印协程对象
print(main()
|
执行以上程序会输出如下结果:
1
2
3
|
hello
world
<coroutine object main at 0x1053bb7c8>
|
方法二,await语句构成的awaitable对象
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello') #awaitable对象
await say_after(2, 'world') #awaitable对象
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
|
执行以上程序会输出如下结果:
1
2
3
4
|
started at 17:13:52
hello
world
finished at 17:13:55
|
方法三,asyncio.create_task()把协程封装为Task对象。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
|
执行以上程序会输出如下结果:
1
2
3
4
|
started at 17:14:32
hello
world
finished at 17:14:34
|
Python协程是可等待的,因此可以从其他协程中await:
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested()
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
|
任务Task
协程对象就是可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。协程对象不能直接运行,Task对象被用来在事件循环中运行协程。Task对象是Future类的子类。保存了协程运行后的状态,用于未来获取协程的结果。
事件循环使用协同日程调度:一个事件循环每次运行一个Task对象。而一个Task对象会等待一个Future对象完成,该事件循环会运行其他Task、回调或执行IO操作。
使用高层级的asyncio.create_task()函数来创建Task对象,也可用低层级的loop.create_task()或ensure_future()函数。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import asyncio
import time
#定义协程函数对象
async def task(x):
print('Waiting: ', x)
await asyncio.sleep(x)
#创建协程对象coroutine
coroutine = task(2)
#创建事件循环loop
loop = asyncio.get_event_loop()
#使用协程对象coroutine来创建task
task = loop.create_task(coroutine) #等价于task = asyncio.ensure_future(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
|
执行以上程序会输出如下结果:
1
2
3
|
<Task pending coro=<task() running at main.py:4>>
Waiting: 2
<Task finished coro=<task() done, defined at main.py:4> result=None>
|
创建task后,task在加入事件循环之前是pending状态,执行之后是finished状态。
asyncio.ensure_future(coroutine)和loop.create_task(coroutine)都可以创建一个task。run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成Task,Task是Future的子类。isinstance(Task, asyncio.Future)将会输出True。
.Task绑定回调,获取结果
绑定回调,在Task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import time
import asyncio
async def task(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
def callback(future):
print('Callback: ', future.result())
#创建协程对象coroutine
coroutine = task(2)
#创建事件循环loop
loop = asyncio.get_event_loop()
#创建task对象
task = asyncio.ensure_future(coroutine)
#绑定回调
task.add_done_callback(callback)
print(task)
loop.run_until_complete(task)
print(task)
|
执行以上程序会输出如下结果:
1
2
3
4
|
<Task pending coro=<task() running at main.py:3> cb=[callback() at main.py:8]>
Waiting: 2
Callback: Done after 2s
<Task finished coro=<task() done, defined at main.py:3> result='Done after 2s'>
|
coroutine执行结束时候会调用回调函数。并通过参数future获取协程执行的结果。我们创建的task和回调里的future对象,实际上是同一个对象。
.Task不适用绑定,获取结果
回调中我们使用了future对象的result方法。前面不绑定回调的例子中,我们可以看到task有finished状态。在那个时候,可以直接读取task的result方法。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import time
import asyncio
async def task(x):
print('Waiting {}'.format(x))
return 'Done after {}s'.format(x)
#创建协程对象coroutine
coroutine = task(2)
#创建事件循环loop
loop = asyncio.get_event_loop()
#使用协程对象coroutine来创建task
task = asyncio.ensure_future(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
# task.result()是协程对象的返回值
print('Task result: {}'.format(task.result()))
|
执行以上程序会输出如下结果:
1
2
3
4
|
<Task pending coro=<task() running at main.py:3>>
Waiting 2
<Task finished coro=<task() done, defined at main.py:3> result='Done after 2s'>
Task result: Done after 2s
|
未来Future
代表将来执行或没有执行的任务的结果。它和Task上没有本质的区别。Future也是一个awaitable对象,协程可以await未来对象,用来获取结果。需要注意的是永远不要在面向用户的API中公开Future对象,并且建议创建Future对象的方法是调用loop.create_future()。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import asyncio
# 定义协程slow_operation
async def slow_operation(future):
await asyncio.sleep(1) # 睡眠
future.set_result('Future is done!') # future设置结果
# 获取事件循环
loop = asyncio.get_event_loop()
# 创建future对象
future = loop.create_future()
print(future)
# 创建任务
task = asyncio.ensure_future(slow_operation(future))
# 阻塞直到future执行完才停止事件
loop.run_until_complete(task)
# 输出future的结果 Future is done!
print(future.result())
print(future)
loop.close()
|
执行以上程序会输出如下结果:
1
2
3
|
<Future pending>
Future is done!
<Future finished result='Future is done!'>
|
转载请注明本网址。