asyncio

asyncio是用来编写并发代码的库,使用async/await语法。被用作多个提供高性能Python异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。是构建IO密集型和高层级结构化网络代码的最佳选择。

asyncio提供一些低层级API以支持库和框架的开发者实现,本节先介绍几个概念。

事件循环event loop

程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。

以下低级函数可用于获取、设置或创建事件循环:

asyncio.get_running_loop()
获取正在运行的事件循环。如果没有正运行的事件循环,抛出RuntimeError异常。此函数只能被协程对象或者回调对象调用。

asyncio.get_event_loop()
获取当前事件循环。如果当前OS线程中没有设置当前事件循环,并且尚未调用set_event_loop(),asyncio将创建一个新的事件循环并将其设置为当前事件循环。

由于此函数具有相当复杂的行为(特别是在使用自定义事件循环策略时),因此最好使用get_running_loop()函数在协程和回调中get_event_loop()。

还可以考虑使用asyncio.run()函数,而不是使用较低级别的函数手动创建和关闭事件循环。

asyncio.set_event_loop(loop)
将loop设置为当前OS线程的当前事件循环。

asyncio.new_event_loop()
创建新的事件循环。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import time
import asyncio
#定义协程函数对象
async def taskFunc(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)

#创建协程对象coroutine
coroutine = taskFunc(2)
#创建事件循环loop
loop = asyncio.get_event_loop()
#协程对象coroutine注册到事件循环
loop.run_until_complete(coroutine)

#打印事件循环的类型
print(type(loop))
#打印事件循环对象
print(loop)

执行以上程序会输出如下结果:

1
2
3
Waiting:  2
<class 'asyncio.unix_events._UnixSelectorEventLoop'>
<_UnixSelectorEventLoop running=False closed=False debug=False>

可等待对象awaitable

await用于挂起阻塞的异步调用接口。使用了await语句的对象就是awaitable。主要有三种awaitable对象:协程coroutines, 任务Tasks和未来Futures。

阻塞和await

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行。 耗时的操作一般是一些IO操作,例如网络请求,文件读取等。我们使用asyncio.sleep函数来模拟IO操作。协程的目的也是让这些IO操作异步化。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import asyncio
import time
async def task(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

#创建协程对象coroutine
coroutine = task(2)
#创建事件循环loop
loop = asyncio.get_event_loop()
#使用协程对象coroutine来创建task
task = asyncio.ensure_future(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print('Task result: ', task.result())

执行以上程序会输出如下结果:

1
2
3
4
<Task pending coro=<task() running at main.py:3>>
Waiting:  2
<Task finished coro=<task() done, defined at main.py:3> result='Done after 2s'>
Task result:  Done after 2s

在sleep的时候,使用await让出控制权。即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其他的协程。睡眠x秒后,协程task函数再一次获得控制权,并返回字符串Task result: Done after 2s 。

coroutine协程

通过async关键字定义一个协程(coroutine),协程也是一种对象。协程不能直接运行,需要把协程加入到事件循环(loop),由后者在适当的时候调用协程。asyncio.get_event_loop方法可以获取一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import time
import asyncio
#定义协程函数对象
async def taskFunc(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)

#创建协程对象coroutine
coroutine = taskFunc(2)

#打印协程的类型
print(type(coroutine))
#打印协程对象
print(coroutine)

#创建事件循环loop
loop = asyncio.get_event_loop()
#协程对象coroutine注册到事件循环
loop.run_until_complete(coroutine)

执行以上程序会输出如下结果:

1
2
3
<class 'coroutine'>
<coroutine object taskFunc at 0x7f55e51e6830>
Waiting:  2

运行协程异步,有三种方法:
方法一,使用asyncio.run(func)函数去调用函数func。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import asyncio
#定义协程要运行的函数
async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

#运行协程
asyncio.run(main())
#打印协程对象
print(main()

执行以上程序会输出如下结果:

1
2
3
hello
world
<coroutine object main at 0x1053bb7c8>

方法二,await语句构成的awaitable对象

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')  #awaitable对象
    await say_after(2, 'world')  #awaitable对象

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

执行以上程序会输出如下结果:

1
2
3
4
started at 17:13:52
hello
world
finished at 17:13:55

方法三,asyncio.create_task()把协程封装为Task对象。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

执行以上程序会输出如下结果:

1
2
3
4
started at 17:14:32
hello
world
finished at 17:14:34

Python协程是可等待的,因此可以从其他协程中await:

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

任务Task

协程对象就是可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。协程对象不能直接运行,Task对象被用来在事件循环中运行协程。Task对象是Future类的子类。保存了协程运行后的状态,用于未来获取协程的结果。

事件循环使用协同日程调度:一个事件循环每次运行一个Task对象。而一个Task对象会等待一个Future对象完成,该事件循环会运行其他Task、回调或执行IO操作。

使用高层级的asyncio.create_task()函数来创建Task对象,也可用低层级的loop.create_task()或ensure_future()函数。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import asyncio
import time
#定义协程函数对象
async def task(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)

#创建协程对象coroutine
coroutine = task(2)
#创建事件循环loop
loop = asyncio.get_event_loop()
#使用协程对象coroutine来创建task
task = loop.create_task(coroutine) #等价于task = asyncio.ensure_future(coroutine)
print(task)
loop.run_until_complete(task)
print(task)

执行以上程序会输出如下结果:

1
2
3
<Task pending coro=<task() running at main.py:4>>
Waiting:  2
<Task finished coro=<task() done, defined at main.py:4> result=None>

创建task后,task在加入事件循环之前是pending状态,执行之后是finished状态。

asyncio.ensure_future(coroutine)和loop.create_task(coroutine)都可以创建一个task。run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成Task,Task是Future的子类。isinstance(Task, asyncio.Future)将会输出True。

.Task绑定回调,获取结果 绑定回调,在Task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import asyncio
async def task(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

def callback(future):
    print('Callback: ', future.result())

#创建协程对象coroutine
coroutine = task(2)
#创建事件循环loop
loop = asyncio.get_event_loop()
#创建task对象
task = asyncio.ensure_future(coroutine)
#绑定回调
task.add_done_callback(callback)
print(task)
loop.run_until_complete(task)
print(task)

执行以上程序会输出如下结果:

1
2
3
4
<Task pending coro=<task() running at main.py:3> cb=[callback() at main.py:8]>
Waiting:  2
Callback:  Done after 2s
<Task finished coro=<task() done, defined at main.py:3> result='Done after 2s'>

coroutine执行结束时候会调用回调函数。并通过参数future获取协程执行的结果。我们创建的task和回调里的future对象,实际上是同一个对象。

.Task不适用绑定,获取结果 回调中我们使用了future对象的result方法。前面不绑定回调的例子中,我们可以看到task有finished状态。在那个时候,可以直接读取task的result方法。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import time
import asyncio
async def task(x):
    print('Waiting {}'.format(x))
    return 'Done after {}s'.format(x)

#创建协程对象coroutine
coroutine = task(2)
#创建事件循环loop
loop = asyncio.get_event_loop()
#使用协程对象coroutine来创建task
task = asyncio.ensure_future(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
# task.result()是协程对象的返回值
print('Task result: {}'.format(task.result()))

执行以上程序会输出如下结果:

1
2
3
4
<Task pending coro=<task() running at main.py:3>>
Waiting 2
<Task finished coro=<task() done, defined at main.py:3> result='Done after 2s'>
Task result: Done after 2s

未来Future

代表将来执行或没有执行的任务的结果。它和Task上没有本质的区别。Future也是一个awaitable对象,协程可以await未来对象,用来获取结果。需要注意的是永远不要在面向用户的API中公开Future对象,并且建议创建Future对象的方法是调用loop.create_future()。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
# 定义协程slow_operation
async def slow_operation(future):
    await asyncio.sleep(1)  # 睡眠
    future.set_result('Future is done!')  # future设置结果

# 获取事件循环
loop = asyncio.get_event_loop()
# 创建future对象
future = loop.create_future()

print(future)

# 创建任务
task = asyncio.ensure_future(slow_operation(future))
# 阻塞直到future执行完才停止事件
loop.run_until_complete(task)

# 输出future的结果 Future is done!
print(future.result())

print(future)
loop.close()

执行以上程序会输出如下结果:

1
2
3
<Future pending>
Future is done!
<Future finished result='Future is done!'>

转载请注明本网址。