设为首页收藏本站

LUPA开源社区

 找回密码
 注册
文章 帖子 博客
LUPA开源社区 首页 业界资讯 技术文摘 查看内容

gevent:轻松异步I/O

2014-9-3 09:33| 发布者: joejoe0332| 查看: 6183| 评论: 0|原作者: gones945, FreeZ|来自: oschina

摘要: 有些奇怪monkey.patch_all()的调用?不用担心,这可不像你每天打的猴子补丁(译注:monkey patching,即动态修改执行代码)。这仅仅是Python发行版恰好要打的一组猴子补丁。 ...


异步I/O

  所有的异步I/O都依赖于同一种模式.它不在于代码如何运行,而在于在何处完成等待.多路I/O操作需要统一做等待处理,于是,等待只在代码中的一个地方出现.当事件触发的时候,异步系统需要恢复等待这个事件的代码块.


  接下来的问题不在于在一个地方做等待,而在于如何恢复等待接收事件的代码块.


  这里有一些方法,关于如何组织一个单线程程序,所有的等待只在代码中的一个地方完成.在下面的事件循环代码中,关于resume()和waiter有一些不同的实现方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
read_waiters = {}
write_waiters = {}
timeout_waiters = []
 
def wait_for_read(fd, waiter):
    read_waiters[fd] = waiter
 
wait_for_write = write_waiters.___setitem__
 
def event_loop():
    while True:
        readfds = read_waiters.keys()
        writefds = write_waiters.keys()
 
        read, write, error = select.select(
            readfds,  # waiting for read
            writefds,  # waiting for write
            readfds + writefds,  # waiting for errors
        )
 
        for fd in read:
            resume(read_waiters.pop(fd))
 
        for fd in write:
            resume(write_waiters.pop(fd))
 
        # something about errors


  我们可能希望在上面的代码中增加timeouts,在这种情况下,我们可能会写一些类似下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
timeout_waiters = []
 
def wait_for_timeout(delay, waiter):
    when = time.time() + delay
    heapq.heappush(timeout_waiters, (when, waiter))
 
def event_loop():
    while True:
        now = time.time()
        read, write, error = select.select(
            rfds, wfds, efds,
            timeout_waiters[0][0- time.time()
        )
        while timeout_waiters:
            if timeout_waiters[0][0] <= now:
                 _, waiter = heapq.heappop(timeout_waiters)
                 resume(waiter)


  所有的异步I/O框架都是建立在同样的模型之上.只是采用了不同的方式构建代码.这样,当发出I/O操作请求的时候,可以暂停;当这个操作完成的时候,又可以恢复.


回调

例子:

  • Javascript/Node

  • Tornado IOStream

  • Twisted Deferred

  • asyncio, under the hood


  有一种方式是,当有数据可读的时候,只是调用一个可调用的函数. 通常,我们希望在更高层面去做处理,而不仅仅只是处理一块一块的数据.于是,我们用一个回调函数读取和分析二进制数据块,当分析数据内容的过程完成后,调用应用程序的回调函数(例如HTTP请求或是应答).


  用户代码看起来像下面这样:

1
2
3
4
5
6
def start_beer_request():
    http.get('/api/beer', handle_response)
 
def handle_response(resp):
    beer = load_beer(resp.json)
    do_something(beer)


  我们如何才能把响应和特定的请求关联起来呢?一种方式是使用闭包:

1
2
3
4
5
6
def get_fruit(beer_id, callback):
    def handle_response(resp):
        beer = load_beer(resp.json)
        callback(beer)
 
    http.get('/api/beer/%d' % beer_id, handle_response)


  这两种方式都比较丑陋,特别是当我们需要关联更多的I/O调用的时候(有人喜欢这种更深层的嵌套吗?).这将会无法逃脱嵌套以及碎片化的编程.就像人们常说的,"回调就是新的GOTO语句".


回调堆栈看起来像这样:

/presentations/gevent-talk/_static/callbacks.svg


  注意,返回值别无用处,视觉上,唯一的传递结果的方式是编写额外的回调函数.


基于方法的回调

例子:

  • Twisted Protocols

  • Tornado RequestHandler

  • asyncio Transports/Protocols

  回调简直是一团糟!但是我们可以将回调组织成接口,而这些接口的函数则自动注册为回调,这样,使用者只要继承接口去实现即可.例如,asyncio的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
 
class EchoClient(asyncio.Protocol):
    message = 'This is the message. It will be echoed.'
 
    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))
 
    def data_received(self, data):
        print('data received: {}'.format(data.decode()))
 
    def connection_lost(self, exc):
        print('server closed the connection')
        asyncio.get_event_loop().stop()


  这种方式并没有解决消除回调的问题,而只是提供了一种更为简洁的框架,去避免回调散布在代码的各个地方.它设置了对回调怎么调用以及在哪里定义的限制.假设,你希望把两种协议连接起来,例如,你在处理一个来自用户的请求过程中,需要将一个HTTP请求发送至后端的REST服务.这时,你就会碰到多个回调在逻辑上碎片化的问题.同时,你也无法使用异步函数的返回值.


回调中的错误处理

  当使用一个基于回调的编程模型的时候,你不得不注册额外的错误处理回调函数.


  可惜的是,不是所有的框架都强化这一点.一个原因是,如果强化的话,将会使每个单独的程序晦涩难懂.于是,它是一个可选项.结果是,程序员并不是总是会(甚至经常不)这样做


  这违反了 PEP20:

错误应该显示的传递,除非被显示的忽略了.


  没有合理的处理错误的一个较大的风险是,程序内部状态不再是同步的状态,可能会因为等待永远不会触发的事件而死锁,或者是无限的保留一个已经断开的连接的资源.


基于生成器的协程

例子:

  • tornado.gen

  • asyncio/Tulip

  生成器已经被用作实现类似协程的功能.它允许我们在某个事件循环系统中,在I/O操作完成后,恢复回调之后的代码块.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
 
@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return + y
 
@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))
 
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(12))
loop.close()


  (在这个例子中,需要指出的是,通过这种办法,我们希望产生其他的同步操作,以便获取一些规模效应的好处).


  在PEP255中介绍生成器的时候,它们被描述为可提供类似协程的功能.PEP342和PEP380拓展了这种能力,在给生成器发送异常的同时,也可以让子生成器产生迭代.


  术语"协程"意味着多于一个常规程序的系统.实际上,调用栈每次调用活跃一次.因为每个调用栈是被保留的,一个常规程序可以暂停它的状态,然后转换到一个不同的协程.在一些编程语言中,有一个yield关键字,某些方式上表现出这样的效果(和Python里面的yield关键字差别很大).


  为什么你希望这么做呢?它提供了一种原始形态的多任务--合作的多任务.不像线程,它们不是抢占式的.这意味着它们不会被中断(抢占),直到显示的调用了yield.


生成器是这种行为的子集,正好契合'yield'术语.维基百科成它们是半协程.然而,生成器和协程有两个重要的不同点:


1.生成器只能迭代到调用帧

2.在迭代到调用帧的时候,栈里面的每一帧都需要协作.顶部帧可能迭代,栈里面的其他调用也来自yield.

这个调用栈看起来像下面这样:

/presentations/gevent-talk/_static/generators.svg

需要指出的是,这种使用yield的方式意味着,你不能使用yield编写异步生成器,而必须返回列表.



酷毙

雷人

鲜花

鸡蛋

漂亮
  • 快毕业了,没工作经验,
    找份工作好难啊?
    赶紧去人才芯片公司磨练吧!!

最新评论

关于LUPA|人才芯片工程|人才招聘|LUPA认证|LUPA教育|LUPA开源社区 ( 浙B2-20090187 浙公网安备 33010602006705号   

返回顶部