Python epoll网络编程一:服务器模型
时间:2010-04-11 来源:zhanghm18
Python epoll网络编程一:服务器模型
python因为GIL的限制,多线程模型无法利用多个核CPU;而采用select/poll/epoll同步非阻塞IO模型,可达到一个线程服务多个请求连接。
第一部分给出一个服务器模型,代码如下,之后还会给出客户端模型,和过滤器模型等。
|
如下回显服务器,只要继承EpollServer, 并且overload函数do_operation即可,代码片段如下:
def usage():
print 'python epoll_server.py <port_number>'
sys.exit(2)
class EchoSrv(EpollServer):
''' extends EpolloServer, overload do_operation'''
def __init__(self, port, logger):
EpollServer.__init__(self, port)
def do_operation(self, fileno):
self.responses[fileno] = self.requests[fileno]
if __name__ == "__main__":
import sys, getopt
try:
port = int(sys.argv[1])
except:
usage()
srv = EchoSrv(port)
srv.init_epoll()
srv.loop_epoll()
EpollServer类的初始化方法代码如下:
epoll_server.py
7 def __init__(self, port, logger):
8 self.port = port
9 self.logger = logger
10 self.status = epoll_util.begin_status
11 # initial epoll fileno
12 self.epoll = select.epoll()
9行:初始化日志
10行:初始化服务器状态,详细状态见下文的 epoll_util.py文件
12行:初始化epoll描述字
EpollServer类的init_epoll()代码如下:
epoll_server.py
14 def init_epoll(self):
15 # initial epoll server socket
16 self.srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
17 self.srv.bind(('', self.port))
18 self.srv.listen(1)
19 self.srv.setblocking(0)
20
21 self.epoll.register(self.srv.fileno(), select.EPOLLIN | select.EPOLLET)
22 # fileno set for client connection
23 self.connections = {}
24 self.requests = {}
25 self.responses = {}
16-19行:初始化服务器非阻塞被动连接socket,
21行:注册被动连接socket的 EPOLLIN事件,并且设置为ET电平触发
23-25行:客户端对端主动连接信息,socket连接self.connections,socket请求self.requests和socket响应self.responses,且均为socket的fileno为key的dict。
EpollServer类的loop_epoll()代码如下:
epoll_server.py
100 def loop_epoll(self):
101 try:
102 while self.status != epoll_util.close_status:
103 events = self.epoll.poll()
104 for fileno, event in events:
105 #print fileno, event
106 self.do_epoll(fileno, event)
107 finally:
108 self.close_epoll()
102-103行:当服务器状态为达到终止状态时,反复监听epoll所注册的socket描述字;
104-106行:对epoll返回的描述字fileno和事件event,调用 self.do_epoll(fileno, event) 完成相应的动作;
107-108行:当有异常发生时,调用self.close_epoll()结束监听。
EpollServer类的do_epoll(fileno, event) 代码如下:
epoll_server.py
87 def do_epoll(self, fileno, event):
88 try:
89 if fileno == self.srv.fileno():
90 self.accept_epoll()
91 elif event & select.EPOLLIN:
92 self.do_request(fileno)
93 elif event & select.EPOLLOUT:
94 self.do_response(fileno)
95 elif event & select.EPOLLHUP:
96 self.hup_epoll(fileno)
97 except:
98 raise
89-90行:当事件来自于服务器被动连接socket时,表示有客户端连接请求到达,调用self.accept_epoll()
接受新的客户端连接,并且注册此连接的EPOLLIN事件;
91-92行:当事件来自服务器端与客户端的链接socket,并且是EPOLLIN可读事件到达,调用do_request(fileno),处理可读事件;
93-94行:当事件来自服务器端与客户端的链接socket,并且是EPOLLOUT可写事件到达,调用do_response(fileno),处理可读事件;
95-96行:当异常事件来自服务器端与客户端的链接socket,调用self.hup_epoll(fileno), 关闭此连接。
EpollServer类的accept_epoll(fileno) 代码如下:
epoll_server.py
31 def accept_epoll(self):
32 try:
33 while True:
34 connection, address = self.srv.accept()
35 connection.setblocking(0)
36 self.epoll.register(connection.fileno(), select.EPOLLIN | select.EPOLLHUP | select.EPOLLET)
37 self.connections[connection.fileno()] = connection
38 self.requests[connection.fileno()] = b''
39 self.responses[connection.fileno()] = b''
40 except socket.error:
41 pass
34-36行:服务器端接受客户端连接请求,并以ET模式注册EPOLLIN事件;
37-38行:初始化改连接的相关数据结构;
EpollServer类的do_request (fileno)为服务器程序的服务逻辑处理单元,代码如下:
epoll_server.py
49 def do_request(self, fileno):
50 self.requests[fileno] += epoll_util.recv_epoll(self.connections[fileno])
51 request = self.requests[fileno].strip()
52 if request == epoll_util.hello_request:
53 self.responses[fileno] = epoll_util.greeting_response
54 elif request == epoll_util.quit_request:
55 self.hup_epoll(fileno)
56 return
57 elif request == epoll_util.close_request:
58 self.hup_epoll(fileno)
59 self.status = epoll_util.close_status
60 return
61 else:
62 self.do_operation(fileno)
63 self.epoll.modify(fileno, select.EPOLLOUT | select.EPOLLET)
50行:调用 epoll_util.recv_epoll(connection)方法读取请求报文;
52行-62行:对于不同的请求,有不同的动作,如当请求是hello时,将response设置greeting;请求是quit,服务器端断开客户端连接;请求是close时,断开客户端连接,并关闭服务器;当请求是其他时,调用self.do_operation(fileno)完成其他逻辑,默认是将response设置为done;报文常量见epoll_util.py文件;
63行:完成处理逻辑后,监听响应EPOLLOUT写事件。
EpollServer类的 do_response (fileno)返回response,代码如下:
epoll_server.py
70 def do_response(self, fileno):
71 epoll_util.send_epoll(self.connections[fileno], self.responses[fileno])
72 self.requests[fileno] = b''
73 self.epoll.modify(fileno, select.EPOLLIN | select.EPOLLET)
71行:调用 epoll_util.send_epoll(connection, response)方法发送响应报文;
72行:清空请求,self.requests[fileno]
73行:监听请求EPOLLIN读事件。
EpollServer类的 hup_epoll (fileno)处理挂起事件,代码如下:
epoll_server.py
75 def hup_epoll(self, fileno):
76 self.epoll.unregister(fileno)
77 self.connections[fileno].close()
78 del self.connections[fileno]
79 del self.requests[fileno]
80 del self.responses[fileno]
75行:解除epoll对此描述字的监听;
76行:关闭此描述字;
78-80行:清空此描述字的其他信息;
EpollServer类的 close_epoll (fileno)关闭服务器监听,代码如下:
epoll_server.py
82 def close_epoll(self):
83 self.epoll.unregister(self.srv.fileno())
84 self.srv.close()
代码如下: 1 import socket 2 import select 3 import epoll_util 4 5 class EpollServer: 6 '''generic epoll server which accept connection from client socket''' 7 def __init__(self, port, logger): 8 self.port = port 9 self.logger = logger 10 self.status = epoll_util.begin_status 11 # initial epoll fileno 12 self.epoll = select.epoll() 13 14 def init_epoll(self): 15 # initial epoll server socket 16 self.srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 17 self.srv.bind(('', self.port)) 18 self.srv.listen(1) 19 self.srv.setblocking(0) 20 21 self.epoll.register(self.srv.fileno(), select.EPOLLIN | select.EPOLLET) 22 # fileno set for client connection 23 self.connections = {} 24 self.requests = {} 25 self.responses = {} 26 27 def set_epoll(self, poll): 28 self.epoll.close() 29 self.epoll = poll 30 31 def accept_epoll(self): 32 try: 33 while True: 34 connection, address = self.srv.accept() 35 connection.setblocking(0) 36 self.epoll.register(connection.fileno(), select.EPOLLIN | select.EPOLLHUP | select.EPOLLET) 37 self.connections[connection.fileno()] = connection 38 self.requests[connection.fileno()] = b'' 39 self.responses[connection.fileno()] = b'' 40 except socket.error: 41 pass 42 43 def do_operation(self, fileno): 44 self.responses[fileno] = epoll_util.done_response 45 46 def is_ready(self, fileno): 47 pass 48 49 def do_request(self, fileno): 50 self.requests[fileno] += epoll_util.recv_epoll(self.connections[fileno]) 51 request = self.requests[fileno].strip() 52 if request == epoll_util.hello_request: 53 self.responses[fileno] = epoll_util.greeting_response 54 elif request == epoll_util.quit_request: 55 self.hup_epoll(fileno) 56 return 57 elif request == epoll_util.close_request: 58 self.hup_epoll(fileno) 59 self.status = epoll_util.close_status 60 return 61 else: 62 self.do_operation(fileno) 63 self.epoll.modify(fileno, select.EPOLLOUT | select.EPOLLET) 64 65 def do_clear(self, fileno): 66 self.responses[fileno] = epoll_util.done_response 67 print 'do clear' 68 69 70 def do_response(self, fileno): 71 epoll_util.send_epoll(self.connections[fileno], self.responses[fileno]) 72 self.requests[fileno] = b'' 73 self.epoll.modify(fileno, select.EPOLLIN | select.EPOLLET) 74 75 def hup_epoll(self, fileno): 76 self.epoll.unregister(fileno) 77 self.connections[fileno].close() 78 del self.connections[fileno] 79 del self.requests[fileno] 80 del self.responses[fileno] 81 82 def close_epoll(self): 83 self.epoll.unregister(self.srv.fileno()) 84 self.srv.close() 85 self.epoll.close() 86 87 def do_epoll(self, fileno, event): 88 try: 89 if fileno == self.srv.fileno(): 90 self.accept_epoll() 91 elif event & select.EPOLLIN: 92 self.do_request(fileno) 93 elif event & select.EPOLLOUT: 94 self.do_response(fileno) 95 elif event & select.EPOLLHUP: 96 self.hup_epoll(fileno) 97 except: 98 raise 99 100 def loop_epoll(self): 101 try: 102 while self.status != epoll_util.close_status: 103 events = self.epoll.poll() 104 for fileno, event in events: 105 #print fileno, event 106 self.do_epoll(fileno, event) 107 finally: 108 self.close_epoll() 109 110 111 def usage(): 112 print 'python epoll_server.py <port_number>' 113 sys.exit(2) 114 115 116 class EchoSrv(EpollServer): 117 ''' extends EpolloServer, overload do_operation''' 118 def __init__(self, port, logger): 119 EpollServer.__init__(self, port, logger) 120 121 def do_operation(self, fileno): 122 self.responses[fileno] = self.requests[fileno] 123 124 if __name__ == "__main__": 125 import sys, getopt 126 try: 127 port = int(sys.argv[1]) 128 except: 129 usage() 130 srv = EchoSrv(port, None) 131 srv.init_epoll() 132 srv.loop_epoll()