0%

python协程学习笔记

什么是协程

简介

asyncio 是用来编写 并发 代码的库,使用async/await语法。 asyncio 被用作多个提供高性能Python异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。 asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

协程有四个状态

  • GEN_CREATED 等待开始执行
  • GEN_RUNIING 解释器正在执行
  • GEN_SUSPENDED 在yield表达式处暂停
  • GEN_cLOSED 执行结束

当前状态使用inspect.getgeneratorstate来确定:

import inspect

inspect.getgeneratorstate(my_coroutine)

协程的优势

  • 执行效率极高,因为子程序切换(函数)不是线程切换,由程序自身控制,没有切换线程的开销。所以与多线程相比,线程的数量越多,协程性能的优势越明显。
  • 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,因此执行效率高很多。

notes: 协程可以处理IO密集型程序的效率问题,但是处理CPU密集型不是它的长处,如要充分发挥CPU利用率可以结合多进程+协程。

协程库asyncio中的基本概念

Eventloop

Eventloop可以说是asyncio应用的核心,中央总控,Eventloop实例提供了注册、取消、执行任务和回调的方法。简单来说,就是我们可以把一些异步函数注册到这个事件循环上,事件循环回循环执行这些函数(每次只能执行一个),如果当前正在执行的函数在等待I/O返回,那么事件循环就会暂停它的执行去执行其他函数。当某个函数完成I/O后会恢复,等到下次循环到它的时候就会继续执行。

Coroutine

协程本质是一个函数, 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import time


async def a():
print('Suspending a')
await asyncio.sleep(3)
print('Resuming a')


async def b():
print('Suspending b')
await asyncio.sleep(1)
print('Resuming b')


async def main():
start = time.perf_counter()
await asyncio.gather(a(), b())
print(f'{main.__name__} Cost: {time.perf_counter() - start}')


if __name__ == '__main__':
asyncio.run(main())

执行上述代码,得到输出如下:

Suspending a
Suspending b
Resuming b
Resuming a
main Cost: 3.0005005000000002

Future

Future是表示一个“未来”对象, 类似于JavaScript中的promise,当异步操作结束后会把最终结果设置到这个Future对象上,Future是对协程的封装。

Task

Eventloop除了支持协程,还支持注册Future和Task2种类型的对象,而Future是协程的封装,Future对象提供了很多任务方法(如完成后的回调,取消,设置任务结果等等),但是一般情况下开发者不需要操作Future这种底层对象,而是直接用Future的子类Task协同的调度协程来实现并发。那么什么是Task呢?下面介绍下:

一个与Future类似的对象,可运行Python协程。非线程安全。 Task对象被用来在事件循环中运行协程。如果一个协程在等待一个Future对象,Task对象会挂起该协程的执行并等待该Future对象完成。当该Future对象完成被打包的协程将恢复执行。 事件循环使用协同日程调度: 一个事件循环每次运行一个Task对象。而一个Task对象会等待一个Future对象完成,该事件循环会运行其他Task、回调或执行IO操作。

代码示例:

async def a():
print('Suspending a')
await asyncio.sleep(3)
print('Resuming a')
task = asyncio.ensure_future(a())
loop.run_until_complete(task)

输出:

Suspending a
Resuming a

async/await 关键字

python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。其作用在一定程度上类似于yield。

asyncio中的一些常见用法和区别

asyncio.gather 和 asyncio.wait

  1. asyncio.gather能收集协程的结果,而且会按照输入协程的顺序保存对应协程的执行结果,而asyncio.wait的返回值有两项,第一项是完成的任务列表,第二项表示等待完成的任务列表。
  2. asyncio.wait支持接受一个参数return_when,在默认情况下,asyncio.wait会等待全部任务完成(return_when=’ALL_COMPLETED’),它还支持FIRST_COMPLETED(第一个协程完成就返回)和FIRST_EXCEPTION(出现第一个异常就返回)

示例:

import asyncio


async def a():
print('Suspending a')
await asyncio.sleep(3)
print('Resuming a')
return 'A'


async def b():
print('Suspending b')
await asyncio.sleep(1)
print('Resuming b')
return 'B'


async def fun1():
return_value_a, return_value_b = await asyncio.gather(a(), b())
print(return_value_a, return_value_b)


async def fun2():
done, pending = await asyncio.wait([a(), b()])
print(done)
print(pending)
task = list(done)[0]
print(task)
print(task.result())


asyncio.run(fun1())
asyncio.run(fun2())

gather输出:

Suspending a
Suspending b
Resuming b
Resuming a
A B

await输出:

Suspending a
Suspending b
Resuming b
Resuming a
{<Task finished coro=<b() done, defined at D:/code/lab/MPP/asyncio1.py:13> result='B'>, <Task finished coro=<a() done, defined at D:/code/lab/MPP/asyncio1.py:6> result='A'>}
set()
<Task finished coro=<b() done, defined at D:/code/lab/MPP/asyncio1.py:13> result='B'>
B

await与yield

await用于挂起阻塞的异步调用接口,其作用在一定程度上类似于yield。
但二者功能上不兼容,不能再生成器中使用await,也不能再async定义的协程中使用yield。

除此之外,yield from后面可接可迭代对象,也可接future对象/协程对象。
await后面必须要接future对象/协程对象。

yield和yield from*

1 yield在生成器中有中断的功能,可以传出值,也可以从函数外部接收值,而yield from的实现就是简化了yield操作。

我们来看一个案例:

def generator_1(titles):
yield titles
def generator_2(titles):
yield from titles

titles = ['Python','Java','C++']
for title in generator_1(titles):
print('生成器1:',title)
for title in generator_2(titles):
print('生成器2:',title)

执行结果:

生成器1: ['Python', 'Java', 'C++']
生成器2: ['Python', 'Java', 'C++']

2 yield from功能还不止于此,它还有一个主要的功能是省去了很多异常的处理,不再需要我们手动编写,其内部已经实现大部分异常处理

下面这个例子 通过生成器来实现一个整数加和的程序,通过send()函数向生成器中传入要加和的数字,然后最后以返回None结束,total保存最后加和的总数。

def generator_1():
total = 0
while True:
x = yield
print('加',x)
if not x:
break
total += x
return total
def generator_2(): # 委托生成器
while True:
total = yield from generator_1() # 子生成器
print('加和总数是:',total)
def main(): # 调用方
g1 = generator_1()
g1.send(None)
g1.send(2)
g1.send(3)
g1.send(None)
# g2 = generator_2()
# g2.send(None)
# g2.send(2)
# g2.send(3)
# g2.send(None)

main()

执行结果如下。可见对于生成器g1,在最后传入None后,程序退出,报StopIteration异常并返回了最后total值是5。

加 2
加 3
加 None
------------------------------------------
StopIteration
<ipython-input-37-cf298490352b> in main()
---> 19 g1.send(None)
StopIteration: 5

如果把g1.send()那5行注释掉,解注下面的g2.send()代码,则结果如下。可见yield from封装了处理常见异常的代码。对于g2即便传入None也不报异常,其中total = yield from generator_1()返回给total的值是generator_1()最终的return total

2
3
None
加和总数是: 5

借用上述例子,这里有几个概念需要理一下:

子生成器:yield from后的generator_1()生成器函数是子生成器
委托生成器:generator_2()是程序中的委托生成器,它负责委托子生成器完成具体任务。
调用方:main()是程序中的调用方,负责调用委托生成器。

yield from在其中还有一个关键的作用是:建立调用方和子生成器的通道,

在上述代码中main()每一次在调用send(value)时,value不是传递给了委托生成器generator_2(),而是借助yield from传递给了子生成器generator_1()中的yield。同理,子生成器中的数据也是通过yield直接发送到调用方main()中。

如何定义/创建协程

只要在一个函数前面加上async关键字,这个函数对象就是一个协程,通过isinstance函数,可以证明它确实是coroutine类型

from collections.abc import Coroutine

async def hello(name):
print('Hello,', name)

if __name__ == '__main__':
# 生成协程对象,并不会运行函数内的代码
coroutine = hello("World")

# 检查是否是协程 Coroutine 类型
print(isinstance(coroutine, Coroutine)) # True

生成器是协程的基础,那我们是不是有办法,将一个生成器,直接变成协程使用呢。答案是有的。

import asyncio
from collections.abc import Generator, Coroutine

'''
只要在一个生成器函数头部用上 @asyncio.coroutine 装饰器
就能将这个函数对象,【标记】为协程对象。注意这里是【标记】,划重点。
实际上,它的本质还是一个生成器。
标记后,它实际上已经可以当成协程使用。后面会介绍。
'''
@asyncio.coroutine
def hello():
# 异步调用asyncio.sleep(1):
yield from asyncio.sleep(1)


if __name__ == '__main__':
coroutine = hello()
print(isinstance(coroutine, Generator)) # True
print(isinstance(coroutine, Coroutine)) # False

要真正运行一个协程,asyncio提供了主要三种机制

  1. 使用asyncio.run()函数来运行最高层级入口点main()函数
>>> import asyncio

>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')

>>> asyncio.run(main())
hello
world
  1. 等待一个协程
    以下代码段会在等待 1 秒后打印 “hello”,然后 再次 等待 2 秒后打印 “world”:
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
print(f"started at {time.strftime('%X')}")

await say_after(1, 'hello')
await say_after(2, 'world')

print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

输出

started at 17:13:52
hello
world
finished at 17:13:55
  1. 使用asyncio.create_task()函数来并发运行作为asyncio任务的多个协程
    修改以上事例,并发运行两个say_after协程
import asyncio
import time


async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)


async def main():
task1 = asyncio.create_task(
say_after(1, 'hello')
)
task2 = asyncio.create_task(
say_after(2, 'world')
)
print(f"started at {time.strftime('%X')}")
await task1
await task2
print(f"finished at {time.strftime('%X')}")


if __name__ == '__main__':
asyncio.run(main())

输出:

started at 19:09:59
hello
world
finished at 19:10:01 # 比之前的运行时间快了1

可等待对象

如果一个对象可以在await语句中使用,那么它就是可等待对象。

可等待对象有三种主要类型:协程,任务,Future

协程
Python协程属于可等待对象,因此可以在其他协程中被等待

import asyncio


async def nested():
return 42


async def main():
nested() # 什么都不会发生,一个协程对象呗创建但是处于await状态
print(await nested()) # 将会返回42


asyncio.run(main())

重要 在本文芳中“协程”可用来表示两个紧密关联的概念
1.协程函数 定义形式为async def的函数 2. 协程对象 调用协程函数所返回的对象

任务
任务被用来设置日程以便 并发执行协程
当一个协程通过asyncio.create_task()等函数被打包为一个任务,该协程将自动排入日程准备立即运行

import asyncio

async def nested():
return 42

async def main():
task = asyncio.create_task(nested())
await task

asyncio.run(main())

Future对象
Future是一种特殊的低层级可等待对象,表示一个异步操作的最终结果。当一个future对象被等待,这意味着协程将报错等待直到该future对象在其他地方操作完毕,在asyncio中需要future对象以便允许通过async/await使用基于回调的代码。通常情况下没有必要在应用层级的代码中创建future对象。future对象有时会由库和某些asyncio API暴露给用户,用户可等待对象。

async def main():
await function_that_returns_a_future_object()

await asyncio.gather(
function_that_returens_a_future_object(),
some_python_coroutine()
)

协程如何工作

协程完整的工作流程是这样的:

  • 定义/创建协程对象
  • 将协程转为task任务
  • 定义事件循环对象容器
  • 将task任务扔进事件循环对象中出发

示例:

import asyncio


async def hello(name):
print('Hello', name)


if __name__ == '__main__':
# 定义协程对象
coroutine = hello('world')

# 定义事件循环对象容器
loop = asyncio.get_event_loop()

# 将协程转为task任务
task = loop.create_task(coroutine)

# 将task任务扔进事件循环对象中并触发
loop.run_until_complete(task)

绑定回调函数返回结果

异步IO的实现原理,就是在IO高的地方挂起,等IO结束后,再继续执行。在绝大部分时候,我们后续的代码的执行是需要依赖IO的返回值的,这就要用到回调了。回调的实现有两种:

一种是绝大部分程序员喜欢的,利用同步编程实现的回调:

import asyncio
import time


async def _sleep(x):
time.sleep(2)
return '暂停了{}秒'.format(x)


corotine = _sleep(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(corotine)
# task = loop.create_task(corotine) 两种语法二选一
loop.run_until_complete(task)

print('返回结果: {}'.format(task.result()))

还有一种是通过asyncio自带的添加回调函数功能来实现

import time
import asyncio


async def _sleep(x):
time.sleep(2)
return '暂停了{}秒'.format(x)


def callback(future):
print("这里是回调函数,获取返回结果是: {}".format(future.result()))


corotine = _sleep(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(corotine)
# task = loop.create_task(corotine)

task.add_done_callback(callback)
loop.run_until_complete(task)

协程与任务

运行asyncio程序

asyncio.run(coro, *, debug=False)

执行 coroutine coro 并返回结果。

此函数运行传入的协程,负责管理 asyncio 事件循环并 完结异步生成器。

当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。

如果 debug 为 True,事件循环将以调试模式运行。

此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。

import asyncio


async def nested():
return 42


async def main():
await asyncio.sleep(1)
print('hello')


asyncio.run(main())

创建任务

asyncio.create_task(coro, *, name=None)

将 coro 协程 打包为一个 Task 排入日程准备执行。返回 Task 对象。

If name is not None, it is set as the name of the task using Task.set_name().

该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。

async def coro():
...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

休眠

coroutine asyncio.sleep(delay, result=None, *, loop=None)¶

阻塞 delay 指定的秒数。

如果指定了 result,则当协程完成时将其返回给调用者。

sleep() 总是会挂起当前任务,以允许其他任务运行。

Deprecated since version 3.8, will be removed in version 3.10: loop 形参。

以下协程示例运行 5 秒,每秒显示一次当前日期:

import asyncio
import datetime

async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)

asyncio.run(display_date())

并发运行任务

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

并发 运行 aws 序列中的 可等待对象。

如果 aws 中的某个可等待对象为协程,它将自动作为一个任务加入日程。

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

如果 return_exceptions 为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。

如果 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。

如果 aws 序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了 CancelledError 一样处理 – 在此情况下 gather() 调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。

import asyncio


async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"task {name}: Compute factorial {i} ...")
await asyncio.sleep(1)
f *= i
print(f"task {name}: factorial ({number}) = {f}")


async def main():
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4)
)


asyncio.run(main())
# 输出
# task A: Compute factorial 2 ...
# task B: Compute factorial 2 ...
# task C: Compute factorial 2 ...
# task A: factorial (2) = 2
# task B: Compute factorial 3 ...
# task C: Compute factorial 3 ...
# task B: factorial (3) = 6
# task C: Compute factorial 4 ...
# task C: factorial (4) = 24

屏蔽取消操作

awaitable asyncio.shield(aw, *, loop=None)

保护一个 可等待对象 防止其被 取消。

如果 aw 是一个协程,它将自动作为任务加入日程。

以下语句:
res = await shield(something())
相当于:
res = await something()

不同之处 在于如果包含它的协程被取消,在 something()中运行的任务不会被取消。从 something()的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 “await” 表达式仍然会引发 CancelledError。

如果通过其他方式取消 something() (例如在其内部操作) 则 shield() 也会取消。

如果希望完全忽略取消操作 (不推荐) 则 shield() 函数需要配合一个 try/except 代码段,如下所示:

try:
res = await shield(something())
except CancelledError:
res = None

超时

coroutine asyncio.wait_for(aw, timeout, *, loop=None)¶

等待 aw 可等待对象 完成,指定 timeout 秒数后超时。

如果 aw 是一个协程,它将自动作为任务加入日程。

timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。

如果发生超时,任务将取消并引发 asyncio.TimeoutError.

要避免任务 取消,可以加上 shield()。

函数将等待直到目标对象确实被取消,所以总等待时间可能超过 timeout 指定的秒数。

如果等待被取消,则 aw 指定的对象也会被取消。

import asyncio


async def eternity():
await asyncio.sleep(3600)
print('sb!')


async def main():
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')


asyncio.run(main())

简单等待

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

并发运行 aws 指定的 可等待对象 并阻塞线程直到满足 return_when 指定的条件。

返回两个 Task/Future 集合: (done, pending)。

用法:
done, pending = await asyncio.wait(aws)
如指定 timeout (float 或 int 类型)则它将被用于控制返回之前等待的最长秒数。

请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将在指定秒数后被返回。

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常数 描述
FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED。
ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。

与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。

来自其他线程的日程安排

asyncio.run_coroutine_threadsafe(coro, loop)

向指定事件循环提交一个协程。线程安全。

返回一个 concurrent.futures.Future 以等待来自其他 OS 线程的结果。

此函数应该从另一个 OS 线程中调用,而非事件循环运行所在线程。示例:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

task对象

class asyncio.Task(coro, *, loop=None, name=None)

一个与 Future 类似 的对象,可运行 Python 协程。非线程安全。

Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 完成,被打包的协程将恢复执行。

事件循环使用协同日程调度: 一个事件循环每次运行一个 Task 对象。而一个 Task 对象会等待一个 Future 对象完成,该事件循环会运行其他 Task、回调或执行 IO 操作。

使用高层级的 asyncio.create_task() 函数来创建 Task 对象,也可用低层级的 loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。

要取消一个正在运行的 Task 对象可使用 cancel() 方法。调用此方法将使该 Task 对象抛出一个 CancelledError 异常给打包的协程。如果取消期间一个协程正在等待一个 Future 对象,该 Future 对象也将被取消。

cancelled() 可被用来检测 Task 对象是否被取消。如果打包的协程没有抑制 CancelledError 异常并且确实被取消,该方法将返回 True。

asyncio.Task 从 Future 继承了其除 Future.set_result() 和 Future.set_exception() 以外的所有 API。

Task 对象支持 contextvars 模块。当一个 Task 对象被创建,它将复制当前上下文,然后在复制的上下文中运行其协程。

协程中的多任务

asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。

import time
import asyncio


# 协程函数
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)


if __name__ == '__main__':
start = time.time()
# 协程对象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
# 将协程转成task, 并组成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# loop.run_until_complete(asyncio.gather(*tasks)) 另一种写法

for task in tasks:
print('Task ret: ', task.result())
end = time.time()
print('Total run: {}s'.format(end - start))

输出:

Waiting:  1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Total run: 4.0031304359436035s

协程中的嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。

import asyncio


# 用于内部的协程函数
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)


# 外部的协程函数
async def main():
# 创建三个协程对象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

dones, pedings = await asyncio.wait(tasks)

for task in dones:
print('Task ret: ', task.result())

if __name__ == '__main__':
loop = asyncio.get_event_loop()
coroutine = main()
task = loop.create_task(coroutine)
loop.run_until_complete(task)

仔细查看,可以发现这个例子完全是由 上面「协程中的并发」例子改编而来。结果完全一样。只是把创建协程对象,转换task任务,封装成在一个协程函数里而已。外部的协程,嵌套了一个内部的协程。

协程中的状态

协程中主要有以下几种状态:

  • Pending: 创建future,还未执行
  • Running:事件循环正在调用执行任务
  • Done: 任务执行完毕
  • Cancelled: Task被取消后的状态

动态添加协程

import asyncio
import time
from queue import Queue
from threading import Thread


def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()


async def do_sleep(x, queue, msg=""):
await asyncio.sleep(x)
queue.put(msg)


if __name__ == '__main__':
queue = Queue()
new_loop = asyncio.new_event_loop()

# 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop, ))
t.start()

# 动态添加两个协程
# 这种方法,在主线程是异步的
asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一个"), new_loop)
asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二个"), new_loop)

while True:
msg = queue.get()
print("{}协程运行完...".format(msg))
print(time.ctime())

案例

使用asyncio模拟有限状态机

限状态机(finite state machine or automaton, FSA)是一个数学模型,不仅在工程领域应用广泛,在科学领域也很著名,例如数学和计算机科学等。我们要模拟的状态机如下图所示:
finite-state-machine
在上图中,可以看到我们的系统有 S1, S2, S3, S4 四个状态, 0 和 1 是状态机可以从一个状态到另一个状态的值(这个过程叫做转换)。例如在本实验中,只有当只为1的时候, S0 可以转换到 S1 ,当只为0的时候, S0 可以转换到 S2 .Python代码如下,状态模拟从 S0 开始,叫做 初始状态 ,最后到 S4 ,叫做 结束状态

代码实现:

import asyncio
import time
from random import randint


@asyncio.coroutine
def StartState():
print("Start state called \n")
input_value = randint(0, 1)
time.sleep(1)
if input_value == 0:
result = yield from State2(input_value)
else:
result = yield from State1(input_value)
print("Resume of the Transition: \nStart State calling " + result)


@asyncio.coroutine
def State1(transition_value):
outputValue = str("State 1 with transition value = {} \n ".format(transition_value))
input_value = randint(0, 1)
time.sleep(1)
print("...Evaluating in State1...")
if input_value == 0:
result = yield from State3(input_value)
else:
result = yield from State2(input_value)
result = "State 1 calling " + result
return outputValue + str(result)


@asyncio.coroutine
def State2(transition_value):
outputValue = str("State 2 with transition value = {} \n".format(transition_value))
input_value = randint(0, 1)
time.sleep(1)
print('...Evaluating in State2...')
if input_value == 0:
result = yield from State1(input_value)
else:
result = yield from State3(input_value)
result = "State 2 calling " + result
return outputValue + str(result)


@asyncio.coroutine
def State3(transition_value):
outputValue = str("State 3 with transition value = {} \n".format(transition_value))
input_value = randint(0, 1)
time.sleep(1)
print("...Evaluating in State3...")
if input_value == 0:
result = yield from State1(input_value)
else:
result = yield from EndState(input_value)
result = "State 3 calling " + result
return outputValue + str(result)


@asyncio.coroutine
def EndState(transition_value):
outputValue = str("End State with transition value = {} \n ".format(transition_value))
print("...Stop Computation...")
return outputValue


if __name__ == '__main__':
print('Finite State Machine simulation with Asyncio Coroutine')
loop = asyncio.get_event_loop()
loop.run_until_complete(StartState())

输出(每次可能不尽相同):

Finite State Machine simulation with Asyncio Coroutine
Start state called
...Evaluating in State2...
...Evaluating in State1...
...Evaluating in State2...
...Evaluating in State1...
...Evaluating in State3...
...Evaluating in State1...
...Evaluating in State3...
...Evaluating in State1...
...Evaluating in State3...
...Stop Computation...
Resume of the Transition:
Start State calling State 2 with transition value = 0
State 2 calling State 1 with transition value = 0
State 1 calling State 2 with transition value = 1
State 2 calling State 1 with transition value = 0
State 1 calling State 3 with transition value = 0
State 3 calling State 1 with transition value = 0
State 1 calling State 3 with transition value = 0
State 3 calling State 1 with transition value = 0
State 1 calling State 3 with transition value = 0
State 3 calling End State with transition value = 1

通过 random 模块的 randint(0, 1) 函数生成了 input_value 的值,决定了下一个转换状态。此函数随机生成1或0:

input_value = randint(0, 1)

得到 input_value 的值之后,通过 yield from 命令调用下一个协程。

if (input_value == 0):
result = yield from State2(input_value)
else:
result = yield from State1(input_value)

result 是下一个协程返回的string,这样我们在计算的最后就可以重新构造出计算过程。

使用asyncio控制任务

Asyncio模块为我们提供了 asyncio.Task(coroutine) 方法来处理计算任务,它可以调度协程的执行。任务对协程对象在事件循环的执行负责。如果被包裹的协程要从future yield,那么任务会被挂起,等待future的计算结果。

当future计算完成,被包裹的协程将会拿到future返回的结果或异常(exception)继续执行。另外,需要注意的是,事件循环一次只能运行一个任务,除非还有其它事件循环在不同的线程并行运行,此任务才有可能和其他任务并行。当一个任务在等待future执行的期间,事件循环会运行一个新的任务。

import asyncio
import time


@asyncio.coroutine
def factorial(number):
f = 1
for i in range(2, number + 1):
print("Asyncio.Task: Compute factorial({})".format(i))
yield from asyncio.sleep(1)
f *= i
print("Asyncio.Task -factorial({}) = {}".format(number, f))


@asyncio.coroutine
def fibonacci(number):
a, b = 0, 1
for i in range(number):
print('Asyncio.Task: Compute fibonacci ({})'.format(i))
yield from asyncio.sleep(1)
a, b = b, a + b
print("Asyncio.Task - fibonacci({}) = {}".format(number, a))


@asyncio.coroutine
def binomialCoeff(n, k):
result = 1
for i in range(1, k + 1):
result = result * (n - i + 1) / i
print("Asyncio.Task: Compute binomialCoeff ({})".format(i))
yield from asyncio.sleep(1)
print("Asyncio.Task - binomialCoeff({0}, {1}) = {2}".format(n, k, result))


if __name__ == '__main__':
start = time.perf_counter()
loop = asyncio.get_event_loop()
coroutine1 = factorial(10)
coroutine2 = fibonacci(10)
coroutine3 = binomialCoeff(20, 10)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.perf_counter()
print("Total run {} seconds.".format(end - start))

在这个例子中,我们定义了三个协程, factorial, fibonacci 和 binomialCoeff ,每一个都带有 asyncio.coroutine 装饰器:

@asyncio.coroutine
def factorial(number):
do Something

@asyncio.coroutine
def fibonacci(number):
do Something

@asyncio.coroutine
def binomialCoeff(n, k):
do Something

为了能并行执行这三个任务,我们将其放到一个task的list中:

if __name__ == "__main__":
tasks = [asyncio.Task(factorial(10)),
asyncio.Task(fibonacci(10)),
asyncio.Task(binomialCoeff(20, 10))]

得到事件循环:

loop = asyncio.get_event_loop()

然后运行任务:

loop.run_until_complete(asyncio.wait(tasks))

这里, asyncio.wait(tasks) 表示运行直到所有给定的协程都完成。

最后,关闭事件循环:

loop.close()

使用asyncio和futures

Asyncio 模块的另一个重要的组件是 Future 类。它和 concurrent.futures.Futures 很像,但是针对Asyncio的事件循环做了很多定制。 asyncio.Futures 类代表还未完成的结果(有可能是一个Exception)。所以综合来说,它是一种抽象,代表还没有做完的事情。

实际上,必须处理一些结果的回调函数被加入到了这个类的实例中。

future的基本方法有:

  • cancel(): 取消future的执行,调度回调函数
  • result(): 返回future代表的结果
  • exception(): 返回future中的Exception
  • add_done_callback(fn): 添加一个回调函数,当future执行的时候会调用这个回调函数
  • remove_done_callback(fn): 从“call whten done”列表中移除所有callback的实例
  • set_result(result): 将future标为执行完成,并且设置result的值
  • set_exception(exception): 将future标为执行完成,并设置Exception
import asyncio
import sys
import time


@asyncio.coroutine
def first_coroutine(future, N):
""" 前n个数的和 """
count = 0
for i in range(1, N + 1):
count += i
# yield from asyncio.sleep(4)
future.set_result("first coroutine (sum of N integers) result = {}".format(count))


@asyncio.coroutine
def second_coroutine(future, N):
""" n的阶乘 """
count = 1
for i in range(2, N + 1):
count *= i
# yield from asyncio.sleep(3)
future.set_result("second coroutine (factorial) result = {}".format(count))


def got_result(future):
print(future.result())


if __name__ == '__main__':
start = time.perf_counter()
N1 = int(sys.argv[1])
N2 = int(sys.argv[2])
loop = asyncio.get_event_loop()
future1 = asyncio.Future()
future2 = asyncio.Future()
tasks = [
first_coroutine(future1, N1),
second_coroutine(future2, N2)
]
future1.add_done_callback(got_result)
future2.add_done_callback(got_result)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.perf_counter()
print("Total run {} seconds.".format(end - start))

欢迎关注我的其它发布渠道