Python epoll网络编程二:客户端模型
时间:2010-04-11 来源:zhanghm18
Python epoll网络编程二:客户端模型
第一篇简单介绍了一个使用epoll的服务器模型,本节介绍使用epoll的客户端模型,客户端连接多个服务器。
以下是发送hello请求的客户端,可以作为探测程序,探测服务器的工作状态,如果返回greeting,表示服务器正常工作。代码如下:
def usage():
print 'python epoll_client.py <host>:<port> <host>:<port> ...'
print '\texample: python epoll_client.py localhost:9999 localhost:9998'
sys.exit(1)
class HelloClient(EpollConnector):
''' extends EpolloServer, overload do_operation'''
def __init__(self, logger, srvs):
EpollConnector.__init__(self, logger, srvs)
def parse_response(self, fileno):
if self.responses[fileno] != epoll_util.greeting_response:
try:
print 'error in response %s'%str(self.connections[fileno].getpeername())
except:
print 'disconnect'
if __name__ == "__main__": srvs = [] for arg in sys.argv[1:]: try: a, p = arg.split(':') srvs.append((a, int(p))) except : usage() if len(srvs) == 0: usage() srv = HelloClient(None, srvs) srv.init_epoll() srv.loop_epoll()
EpollConnector.init_epoll(): epoll_connector.py 21 def init_epoll(self): 22 # fileno set for connection 23 self.conns_index = {} 24 self.connections = {} 25 self.requests = {} 26 self.responses = {} 27 for i in range( len( self.srvs_addr ) ): 28 try: 29 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 30 s.connect(self.srvs_addr[i]) 31 s.setblocking(0) 32 fileno = s.fileno() 33 self.epoll.register(fileno, select.EPOLLOUT | select.EPOLLET) 34 except socket.error: 35 print 'ERROR in connect to %s'%str( self.srvs_addr[i] ) 36 sys.exit(1) 37 else: 38 self.conns_index[fileno] = i 39 self.connections[fileno] = s 40 self.requests[fileno] = b'' 41 self.responses[fileno] = b''
23-26行:初始化连接信息,key均为fileno; conns_index的value是客户连接编号; connections,requests,responses 的value分别是socket和请求与相应; 29-32行:初始化客户端连接,监听EPOLLOUT;与服务器模型相反,服务器模型初始化监听EPOLLIN; 34-36行:当连接失败时,退出; 38-41行:当连接成功,初始化连接的信息;
EpollConnector.loop_epoll(): epoll_connector.py 92 def loop_epoll(self): 93 try: 94 while self.status != epoll_util.close_status: 95 events = self.epoll.poll() 96 for fileno, event in events: 97 self.do_epoll(fileno, event) 98 finally: 99 self.close_epoll()
92-99行:与服务器模型相同,处理监听事件;
EpollConnector.do_epoll(): epoll_connector.py 72 def do_epoll(self, fileno, event): 73 try: 74 if event & select.EPOLLOUT: 75 self.do_request(fileno) 76 elif event & select.EPOLLIN: 77 self.do_response(fileno) 78 elif event & select.EPOLLHUP: 79 self.hup_epoll(fileno) 80 except: 81 raise
74-75行:EPOLLOUT 可读时,调用do_request读取相应; 77-78行:EPOLLIN可读时,调用do_response读取相应; request和response是针对连接请求而言,因此客户端与服务器段的监听事件与监听动作正好相反;
EpollConnector.do_request(self, fileno): epoll_connector.py 83 def set_request(self, fileno): 84 self.requests[fileno] = epoll_util.hello_request 85 86 def do_request(self, fileno): 87 self.set_request(fileno) 88 epoll_util.send_epoll(self.connections[fileno], self.requests[fileno]) 89 self.epoll.modify(fileno, select.EPOLLIN | select.EPOLLET) 90 self.requests[fileno] = b''
87行:调用set_request(fileno) 设置请求,具体的客户端可overload此函数;
EpollConnector.do_response(self, fileno): epoll_connector.py 43 #overload 44 def parse_response(self, response): 45 if self.responses[fileno] == epoll_util.fail_response: 46 pass 47 48 def do_response(self, fileno): 49 self.responses[fileno] = b'' 50 s = self.connections[fileno] 51 self.responses[fileno] += epoll_util.recv_epoll(s) 52 self.parse_response(fileno) 53 self.epoll.modify(fileno, select.EPOLLOUT | select.EPOLLET)
52行:调用parse_response(response) 处理请求相应,具体的客户端可overload此函数;
源代码如下: 1 import socket 2 import select 3 import sys 4 import epoll_util 5 6 class EpollConnector: 7 '''generic epoll connectors which connect to down stream server''' 8 def __init__(self, logger, srvs): 9 self.logger = logger 10 self.srvs_addr = srvs 11 #status 12 self.status = epoll_util.begin_status 13 14 # initial epoll fileno 15 self.epoll = select.epoll() 16 17 def set_epoll(self, poll): 18 self.epoll.close() 19 self.epoll = poll 20 21 def init_epoll(self): 22 # fileno set for connection 23 self.conns_index = {} 24 self.connections = {} 25 self.requests = {} 26 self.responses = {} 27 for i in range( len( self.srvs_addr ) ): 28 try: 29 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 30 s.connect(self.srvs_addr[i]) 31 s.setblocking(0) 32 fileno = s.fileno() 33 self.epoll.register(fileno, select.EPOLLOUT | select.EPOLLET) 34 except socket.error: 35 print 'ERROR in connect to %s'%str( self.srvs_addr[i] ) 36 sys.exit(1) 37 else: 38 self.conns_index[fileno] = i 39 self.connections[fileno] = s 40 self.requests[fileno] = b'' 41 self.responses[fileno] = b'' 42 43 #overload 44 def parse_response(self, response): 45 if self.responses[fileno] == epoll_util.fail_response: 46 pass 47 48 def do_response(self, fileno): 49 self.responses[fileno] = b'' 50 s = self.connections[fileno] 51 self.responses[fileno] += epoll_util.recv_epoll(s) 52 self.parse_response(fileno) 53 self.epoll.modify(fileno, select.EPOLLOUT | select.EPOLLET) 54 55 #overload 56 def recover(self, fileno): 57 print 'hup_epoll', fileno 58 59 def hup_epoll(self, fileno): 60 self.recover(fileno) 61 self.epoll.unregister(fileno) 62 self.connections[fileno].close() 63 del self.connections[fileno] 64 del self.requests[fileno] 65 del self.responses[fileno] 66 del self.conns_index[fileno] 67 68 69 def close_epoll(self): 70 self.epoll.close() 71 72 def do_epoll(self, fileno, event): 73 try: 74 if event & select.EPOLLOUT: 75 self.do_request(fileno) 76 elif event & select.EPOLLIN: 77 self.do_response(fileno) 78 elif event & select.EPOLLHUP: 79 self.hup_epoll(fileno) 80 except: 81 raise 82 83 def set_request(self, fileno): 84 self.requests[fileno] = epoll_util.hello_request 85 86 def do_request(self, fileno): 87 self.set_request(fileno) 88 epoll_util.send_epoll(self.connections[fileno], self.requests[fileno]) 89 self.epoll.modify(fileno, select.EPOLLIN | select.EPOLLET) 90 self.requests[fileno] = b'' 91 92 def loop_epoll(self): 93 try: 94 while self.status != epoll_util.close_status: 95 events = self.epoll.poll() 96 for fileno, event in events: 97 self.do_epoll(fileno, event) 98 finally: 99 self.close_epoll() 100 101 102 def usage(): 103 print 'python epoll_client.py <host>:<port> <host>:<port> ...' 104 print '\texample: python epoll_client.py 218.241.108.68:9999 218.241.108.68:9998' 105 sys.exit(2) 106 107 108 class HelloClient(EpollConnector): 109 ''' extends EpolloServer, overload do_operation''' 110 def __init__(self, logger, srvs): 111 EpollConnector.__init__(self, logger, srvs) 112 113 def parse_response(self, fileno): 114 if self.responses[fileno] != epoll_util.greeting_response: 115 try: 116 print 'error in response %s'%str(self.connections[fileno].getpeername()) 117 except: 118 print 'disconnect' 119 120 121 if __name__ == "__main__": 122 srvs = [] 123 for arg in sys.argv[1:]: 124 try: 125 a, p = arg.split(':') 126 srvs.append((a, int(p))) 127 except : 128 usage() 129 if len(srvs) == 0: 130 usage() 131 srv = HelloClient(None, srvs) 132 srv.init_epoll() 133 srv.loop_epoll()
|
if __name__ == "__main__": srvs = [] for arg in sys.argv[1:]: try: a, p = arg.split(':') srvs.append((a, int(p))) except : usage() if len(srvs) == 0: usage() srv = HelloClient(None, srvs) srv.init_epoll() srv.loop_epoll()
EpollConnector.init_epoll(): epoll_connector.py 21 def init_epoll(self): 22 # fileno set for connection 23 self.conns_index = {} 24 self.connections = {} 25 self.requests = {} 26 self.responses = {} 27 for i in range( len( self.srvs_addr ) ): 28 try: 29 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 30 s.connect(self.srvs_addr[i]) 31 s.setblocking(0) 32 fileno = s.fileno() 33 self.epoll.register(fileno, select.EPOLLOUT | select.EPOLLET) 34 except socket.error: 35 print 'ERROR in connect to %s'%str( self.srvs_addr[i] ) 36 sys.exit(1) 37 else: 38 self.conns_index[fileno] = i 39 self.connections[fileno] = s 40 self.requests[fileno] = b'' 41 self.responses[fileno] = b''
23-26行:初始化连接信息,key均为fileno; conns_index的value是客户连接编号; connections,requests,responses 的value分别是socket和请求与相应; 29-32行:初始化客户端连接,监听EPOLLOUT;与服务器模型相反,服务器模型初始化监听EPOLLIN; 34-36行:当连接失败时,退出; 38-41行:当连接成功,初始化连接的信息;
EpollConnector.loop_epoll(): epoll_connector.py 92 def loop_epoll(self): 93 try: 94 while self.status != epoll_util.close_status: 95 events = self.epoll.poll() 96 for fileno, event in events: 97 self.do_epoll(fileno, event) 98 finally: 99 self.close_epoll()
92-99行:与服务器模型相同,处理监听事件;
EpollConnector.do_epoll(): epoll_connector.py 72 def do_epoll(self, fileno, event): 73 try: 74 if event & select.EPOLLOUT: 75 self.do_request(fileno) 76 elif event & select.EPOLLIN: 77 self.do_response(fileno) 78 elif event & select.EPOLLHUP: 79 self.hup_epoll(fileno) 80 except: 81 raise
74-75行:EPOLLOUT 可读时,调用do_request读取相应; 77-78行:EPOLLIN可读时,调用do_response读取相应; request和response是针对连接请求而言,因此客户端与服务器段的监听事件与监听动作正好相反;
EpollConnector.do_request(self, fileno): epoll_connector.py 83 def set_request(self, fileno): 84 self.requests[fileno] = epoll_util.hello_request 85 86 def do_request(self, fileno): 87 self.set_request(fileno) 88 epoll_util.send_epoll(self.connections[fileno], self.requests[fileno]) 89 self.epoll.modify(fileno, select.EPOLLIN | select.EPOLLET) 90 self.requests[fileno] = b''
87行:调用set_request(fileno) 设置请求,具体的客户端可overload此函数;
EpollConnector.do_response(self, fileno): epoll_connector.py 43 #overload 44 def parse_response(self, response): 45 if self.responses[fileno] == epoll_util.fail_response: 46 pass 47 48 def do_response(self, fileno): 49 self.responses[fileno] = b'' 50 s = self.connections[fileno] 51 self.responses[fileno] += epoll_util.recv_epoll(s) 52 self.parse_response(fileno) 53 self.epoll.modify(fileno, select.EPOLLOUT | select.EPOLLET)
52行:调用parse_response(response) 处理请求相应,具体的客户端可overload此函数;
源代码如下: 1 import socket 2 import select 3 import sys 4 import epoll_util 5 6 class EpollConnector: 7 '''generic epoll connectors which connect to down stream server''' 8 def __init__(self, logger, srvs): 9 self.logger = logger 10 self.srvs_addr = srvs 11 #status 12 self.status = epoll_util.begin_status 13 14 # initial epoll fileno 15 self.epoll = select.epoll() 16 17 def set_epoll(self, poll): 18 self.epoll.close() 19 self.epoll = poll 20 21 def init_epoll(self): 22 # fileno set for connection 23 self.conns_index = {} 24 self.connections = {} 25 self.requests = {} 26 self.responses = {} 27 for i in range( len( self.srvs_addr ) ): 28 try: 29 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 30 s.connect(self.srvs_addr[i]) 31 s.setblocking(0) 32 fileno = s.fileno() 33 self.epoll.register(fileno, select.EPOLLOUT | select.EPOLLET) 34 except socket.error: 35 print 'ERROR in connect to %s'%str( self.srvs_addr[i] ) 36 sys.exit(1) 37 else: 38 self.conns_index[fileno] = i 39 self.connections[fileno] = s 40 self.requests[fileno] = b'' 41 self.responses[fileno] = b'' 42 43 #overload 44 def parse_response(self, response): 45 if self.responses[fileno] == epoll_util.fail_response: 46 pass 47 48 def do_response(self, fileno): 49 self.responses[fileno] = b'' 50 s = self.connections[fileno] 51 self.responses[fileno] += epoll_util.recv_epoll(s) 52 self.parse_response(fileno) 53 self.epoll.modify(fileno, select.EPOLLOUT | select.EPOLLET) 54 55 #overload 56 def recover(self, fileno): 57 print 'hup_epoll', fileno 58 59 def hup_epoll(self, fileno): 60 self.recover(fileno) 61 self.epoll.unregister(fileno) 62 self.connections[fileno].close() 63 del self.connections[fileno] 64 del self.requests[fileno] 65 del self.responses[fileno] 66 del self.conns_index[fileno] 67 68 69 def close_epoll(self): 70 self.epoll.close() 71 72 def do_epoll(self, fileno, event): 73 try: 74 if event & select.EPOLLOUT: 75 self.do_request(fileno) 76 elif event & select.EPOLLIN: 77 self.do_response(fileno) 78 elif event & select.EPOLLHUP: 79 self.hup_epoll(fileno) 80 except: 81 raise 82 83 def set_request(self, fileno): 84 self.requests[fileno] = epoll_util.hello_request 85 86 def do_request(self, fileno): 87 self.set_request(fileno) 88 epoll_util.send_epoll(self.connections[fileno], self.requests[fileno]) 89 self.epoll.modify(fileno, select.EPOLLIN | select.EPOLLET) 90 self.requests[fileno] = b'' 91 92 def loop_epoll(self): 93 try: 94 while self.status != epoll_util.close_status: 95 events = self.epoll.poll() 96 for fileno, event in events: 97 self.do_epoll(fileno, event) 98 finally: 99 self.close_epoll() 100 101 102 def usage(): 103 print 'python epoll_client.py <host>:<port> <host>:<port> ...' 104 print '\texample: python epoll_client.py 218.241.108.68:9999 218.241.108.68:9998' 105 sys.exit(2) 106 107 108 class HelloClient(EpollConnector): 109 ''' extends EpolloServer, overload do_operation''' 110 def __init__(self, logger, srvs): 111 EpollConnector.__init__(self, logger, srvs) 112 113 def parse_response(self, fileno): 114 if self.responses[fileno] != epoll_util.greeting_response: 115 try: 116 print 'error in response %s'%str(self.connections[fileno].getpeername()) 117 except: 118 print 'disconnect' 119 120 121 if __name__ == "__main__": 122 srvs = [] 123 for arg in sys.argv[1:]: 124 try: 125 a, p = arg.split(':') 126 srvs.append((a, int(p))) 127 except : 128 usage() 129 if len(srvs) == 0: 130 usage() 131 srv = HelloClient(None, srvs) 132 srv.init_epoll() 133 srv.loop_epoll()
相关阅读 更多 +