异步I/O 所有的异步I/O都依赖于同一种模式.它不在于代码如何运行,而在于在何处完成等待.多路I/O操作需要统一做等待处理,于是,等待只在代码中的一个地方出现.当事件触发的时候,异步系统需要恢复等待这个事件的代码块.
接下来的问题不在于在一个地方做等待,而在于如何恢复等待接收事件的代码块.
这里有一些方法,关于如何组织一个单线程程序,所有的等待只在代码中的一个地方完成.在下面的事件循环代码中,关于resume()和waiter有一些不同的实现方法: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | read_waiters = {}
write_waiters = {}
timeout_waiters = []
def wait_for_read(fd, waiter):
read_waiters[fd] = waiter
wait_for_write = write_waiters.___setitem__
def event_loop():
while True :
readfds = read_waiters.keys()
writefds = write_waiters.keys()
read, write, error = select.select(
readfds,
writefds,
readfds + writefds,
)
for fd in read:
resume(read_waiters.pop(fd))
for fd in write:
resume(write_waiters.pop(fd))
|
我们可能希望在上面的代码中增加timeouts,在这种情况下,我们可能会写一些类似下面的代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | timeout_waiters = []
def wait_for_timeout(delay, waiter):
when = time.time() + delay
heapq.heappush(timeout_waiters, (when, waiter))
def event_loop():
while True :
now = time.time()
read, write, error = select.select(
rfds, wfds, efds,
timeout_waiters[ 0 ][ 0 ] - time.time()
)
while timeout_waiters:
if timeout_waiters[ 0 ][ 0 ] < = now:
_, waiter = heapq.heappop(timeout_waiters)
resume(waiter)
|
所有的异步I/O框架都是建立在同样的模型之上.只是采用了不同的方式构建代码.这样,当发出I/O操作请求的时候,可以暂停;当这个操作完成的时候,又可以恢复.
回调例子: Javascript/Node Tornado IOStream Twisted Deferred asyncio, under the hood
有一种方式是,当有数据可读的时候,只是调用一个可调用的函数. 通常,我们希望在更高层面去做处理,而不仅仅只是处理一块一块的数据.于是,我们用一个回调函数读取和分析二进制数据块,当分析数据内容的过程完成后,调用应用程序的回调函数(例如HTTP请求或是应答).
用户代码看起来像下面这样: 1 2 3 4 5 6 | def start_beer_request():
http.get( '/api/beer' , handle_response)
def handle_response(resp):
beer = load_beer(resp.json)
do_something(beer)
|
我们如何才能把响应和特定的请求关联起来呢?一种方式是使用闭包: 1 2 3 4 5 6 | def get_fruit(beer_id, callback):
def handle_response(resp):
beer = load_beer(resp.json)
callback(beer)
http.get( '/api/beer/%d' % beer_id, handle_response)
|
这两种方式都比较丑陋,特别是当我们需要关联更多的I/O调用的时候(有人喜欢这种更深层的嵌套吗?).这将会无法逃脱嵌套以及碎片化的编程.就像人们常说的,"回调就是新的GOTO语句".
回调堆栈看起来像这样: /presentations/gevent-talk/_static/callbacks.svg
注意,返回值别无用处,视觉上,唯一的传递结果的方式是编写额外的回调函数.
基于方法的回调例子: 回调简直是一团糟!但是我们可以将回调组织成接口,而这些接口的函数则自动注册为回调,这样,使用者只要继承接口去实现即可.例如,asyncio的示例代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | import asyncio
class EchoClient(asyncio.Protocol):
message = 'This is the message. It will be echoed.'
def connection_made( self , transport):
transport.write( self .message.encode())
print ( 'data sent: {}' . format ( self .message))
def data_received( self , data):
print ( 'data received: {}' . format (data.decode()))
def connection_lost( self , exc):
print ( 'server closed the connection' )
asyncio.get_event_loop().stop()
|
这种方式并没有解决消除回调的问题,而只是提供了一种更为简洁的框架,去避免回调散布在代码的各个地方.它设置了对回调怎么调用以及在哪里定义的限制.假设,你希望把两种协议连接起来,例如,你在处理一个来自用户的请求过程中,需要将一个HTTP请求发送至后端的REST服务.这时,你就会碰到多个回调在逻辑上碎片化的问题.同时,你也无法使用异步函数的返回值.
回调中的错误处理 当使用一个基于回调的编程模型的时候,你不得不注册额外的错误处理回调函数.
可惜的是,不是所有的框架都强化这一点.一个原因是,如果强化的话,将会使每个单独的程序晦涩难懂.于是,它是一个可选项.结果是,程序员并不是总是会(甚至经常不)这样做
这违反了 PEP20: 错误应该显示的传递,除非被显示的忽略了.
没有合理的处理错误的一个较大的风险是,程序内部状态不再是同步的状态,可能会因为等待永远不会触发的事件而死锁,或者是无限的保留一个已经断开的连接的资源.
基于生成器的协程例子: 生成器已经被用作实现类似协程的功能.它允许我们在某个事件循环系统中,在I/O操作完成后,恢复回调之后的代码块. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import asyncio
@asyncio .coroutine
def compute(x, y):
print ( "Compute %s + %s ..." % (x, y))
yield from asyncio.sleep( 1.0 )
return x + y
@asyncio .coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print ( "%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum( 1 , 2 ))
loop.close()
|
(在这个例子中,需要指出的是,通过这种办法,我们希望产生其他的同步操作,以便获取一些规模效应的好处).
在PEP255中介绍生成器的时候,它们被描述为可提供类似协程的功能.PEP342和PEP380拓展了这种能力,在给生成器发送异常的同时,也可以让子生成器产生迭代.
术语"协程"意味着多于一个常规程序的系统.实际上,调用栈每次调用活跃一次.因为每个调用栈是被保留的,一个常规程序可以暂停它的状态,然后转换到一个不同的协程.在一些编程语言中,有一个yield关键字,某些方式上表现出这样的效果(和Python里面的yield关键字差别很大).
为什么你希望这么做呢?它提供了一种原始形态的多任务--合作的多任务.不像线程,它们不是抢占式的.这意味着它们不会被中断(抢占),直到显示的调用了yield.
生成器是这种行为的子集,正好契合'yield'术语.维基百科成它们是半协程.然而,生成器和协程有两个重要的不同点:
1.生成器只能迭代到调用帧 2.在迭代到调用帧的时候,栈里面的每一帧都需要协作.顶部帧可能迭代,栈里面的其他调用也来自yield. 这个调用栈看起来像下面这样: /presentations/gevent-talk/_static/generators.svg 需要指出的是,这种使用yield的方式意味着,你不能使用yield编写异步生成器,而必须返回列表.
|