首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

使用 Twisted Matrix 框架来进行网络编程--理解异步联网(2)

使用 Twisted Matrix 框架来进行网络编程--理解异步联网(2)

增强的服务器让我们设法增强 Twisted Weblog 服务器,以便它遵循         SocketServer-weblog.py 的模式;无须客户机重复请求即可向客户机提供新记录。这里的问题是,向         WebLog(Protocol) 方法中插入         time.sleep() 调用会导致它阻塞,因此是不允许的。在我们这样做的时候,请注意以前的服务器可能会犯错误,因为它们只向一个客户机提供每批新记录。我们猜测,如果您想允许多个客户机监控一个 Weblog,那么您也会希望它们都接收正在进行的更新。      
Twisted Matrix 在不阻塞的情况下延迟操作的方法是使用         .callLater() 方法向反应器添加回调。以此方法添加的回调被添加到提供服务的事件队列中,但只有在指定的延迟之后才会真正地对其进行处理。将这两项更改放在一起,增强的 Weblog 服务器看起来类似:      
清单 4. twisted-weblog-1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    from twisted.internet import reactor
from twisted.internet.protocol import Protocol, Factory
from webloglib import hit_tag, log_fields
import time
class WebLog(Protocol):
    def connectionMade(self):
        print"Connected from", self.transport.client
        self.transport.write('<hits>')
        self.ts = time.time()
        self.newHits()
    def newHits(self):
        for hit in self.factory.records:
            if self.ts <= hit[0]:
                self.transport.write(hit_tag % log_fields(hit[1]))
        self.ts = time.time()
        reactor.callLater(5, self.newHits)
    def connectionLost(self, reason):
        print"Disconnected from", self.transport.client
class WebLogFactory(Factory):
    protocol = WebLog
    def __init__(self, fname):
        self.fname = fname
        self.records = []
    def startFactory(self):
        self.fp = open(self.fname)
        self.fp.seek(0, 2) # Start at end of current access log
        self.updateRecords()
    def updateRecords(self):
        ts = time.time()
        for rec in self.fp.readlines():
            self.records.append((ts, rec))
        self.records = self.records[-100:]  # Only keep last 100 hits
        reactor.callLater(1, self.updateRecords)
    def stopFactory(self):
        self.fp.close()
if __name__=='__main__':
    reactor.listenTCP(8888, WebLogFactory('access-log'))
    reactor.run()




在这个示例中,我们定义了一个定制工厂,并将一些初始化代码从         _main_ 块移到了该工厂。还要注意的是,该服务器的客户机不需要(也不应该)休眠或发送新的请求 — 实际上,我使用的客户机应用程序就是我在 XML 技巧文章中讨论过的客户机应用程序(请参阅 )。      
工厂和协议在各自的定制方法         .updatedRecords() 和         .newHits() 中使用了相同的技术。即,如果方法想要定期运行,那么其最后一行可以调度该方法在指定的延迟以后重复运行。表面上看来,该模式很像递归 — 但它不是递归(而且重复调度不需要一定在最后一行进行,可以在您期望的地方进行调度)。例如,方法         .newHits() 简单地让控制反应器循环知道它希望再过 5 秒钟后被调用,但该方法本身却终止了。我们并不要求方法只能调度自己 — 它可以调度所期望的任何事情,如果愿意的话,也可以将工厂和协议以外的函数添加到反应器循环。      
持久性和调度除了         reactor.callLater() 调度以外,Twisted Matrix 还包含一个通用类         twisted.internet.defer.Deferred 。实际上,        延迟是对象被调度回调的泛化,但它们也允许使用诸如链接依赖回调和在这些链接中进行错误条件处理之类的技术。         Deferred 对象背后的思想是:当您调用一个方法时,我们不等待其结果(结果可能要过一会儿才出来),该方法可以立即返回一个         Deferred 对象,而反应器/调度程序稍后可以重新调用此对象,那时可望可以得到结果。      
我还没有真正地使用         Deferred 对象,但要使它们正常工作好像有些困难。如果您需要等待一个阻塞操作 — 比如,对于来自远程数据库查询的结果 — 您不会确切地知道在可以使用结果之前究竟要等待多长时间。         Deferred 对象        确实有一个超时机制,但我要在今后的文章才讨论这一机制。感兴趣的读者至少应该知道,Twisted Matrix 开发人员已经试图提供一个标准 API 来包装阻塞操作。当然,最坏的情形是回退到使用线程来进行阻塞操作,因为这些操作确实无法转换成异步回调。      
Twisted Matrix 服务器另外一个重要元素是它们对持久性提供了方便的支持。反应器是一个监控 I/O 事件并对这些事件做出响应的循环。应用程序类似于增强的反应器,能够将其状态进行 pickle 处理(即序列化),以便用于随后的重新启动。而且,可以将应用程序“有状态地”保存到“.tap”文件,并且可以使用工具         twistd 对其进行管理和监控。这里有一个简单的示例,演示了其用法(它是根据 Twisted 文档的         OneTimeKey 示例进行建模的)。该服务器将不同的 Fibonacci 数传递给所有感兴趣的客户机,而不会在它们之间重复这些数字 — 即使服务器被停止然后被启动:      
清单 5. fib_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    from twisted.internet.app import Application
from twisted.internet.protocol import Protocol, Factory
class Fibonacci(Protocol):
    "Serve a sequence of Fibonacci numbers to all requesters"def dataReceived(self, data):
        self.factory.new = self.factory.a + self.factory.b
        self.transport.write('%d' % self.factory.new)
        self.factory.a = self.factory.b
        self.factory.b = self.factory.new
def main():
    import fib_server    # Use script as namespace
    f = Factory()
    f.protocol = fib_server.Fibonacci
    f.a, f.b = 1, 1
    application = Application("Fibonacci")
    application.listenTCP(8888, f)
    application.save()
if'__main__' == __name__:
    main()




您可以看到,我们所做的所有工作主要是用         application 替换         reactor 。虽然类         Application 也有一个         .run() 方法,但我们仍然使用其         .save() 方法来创建一个         Fibonacci.tap 文件。运行该服务器的操作如下所示:      
清单 6. 运行 fib_server.py
1
2
3
4
5
6
7
% python fib_server.py
% twistd -f Fibonacci.tap
...let server run, then shut it down...
% kill `cat twistd.pid`
...re-start server where it left off...
% twistd -f Fibonacci-shutdown.tap
...serve numbers where we left off...




连接到该服务器的客户机如果只是间歇地需要新数字,而不需要尽快地得到新数字的话,那么它应该在其循环中使用         time.sleep() 。显然,更有用的服务器可以提供更有趣的有状态数据流。      
接下来是什么?本文讨论了 TwistedMatrix 比较低级别的细节 — 定义定制协议以及其他内容。但 TwistedMatrix 存在于许多级别中 — 包括用于 Web 服务及其他公共协议的高级别模板制作。在这一系列文章的下一篇中,我们将开始具体地研究 Web 服务,并将挑选一些尚未讨论的杂项主题来进行研究。
返回列表