socketserver在内部使用IO多路复用以及多线程/进程机制,实现了并发处理多个客户端请求的socket服务端。每个客户端请求连接到服务器时,socketserver服务端都会创建一个“线程”或者“进程” 专门负责处理当前客户端的所有请求
处理器接收数据并决定如何操作。BaseRequestHandler是超类提供所有接口,其派生的子类实现这些接口的具体细节。
StreamRequestHandler和DatagramRequestHandler是BaseRequestHandler的子类,用户开发时应继承这两个子类中任一个,并重定义handle()方法
所有请求处理对象的超类。
它负责在socket层之上实现协议(i.e., HTTP, XML-RPC, or AMQP),读取数据,处理并写反应。可以重载的方法如下:
•setup():
在handle()之前的进行任何初始化操作. 默认什么都不做,继承BaseRequestHandler类时可重写setup方法
StreamRequestHandler中会创建文件类似的对象以读写socket, 因此继承StreamRequestHandler类时不能重写setup方法
•handle():
处理请求。解析传入的请求,处理数据,并发送响应。默认什么都不做。
该方法有几个实例属性可使用:
self.request:保存所有请求信息。
对于stream servers,request是socket object;对于datagram servers.request是(string , socket)
self.client_address:客户端地址
self.server:服务器实例对象,可调用服务器对象的所有方法和属性
•finish():
在handle()之后进行任何清理操作。默认什么都不做,如果setup产生异常,不会执行finish。
请求处理器子类,重写了setup()和finish()方法,并提供self.rfile 和 self.wfile属性,可分别用于读取或写入,以获得请求数据或将数据返回给客户端
self.rfile:
rfile是file-like object,用于获取客户端消息,类型为io.BufferedReader(缓冲二进制流)
read()方法: 一直读取直到遇到EOF接受
readline():会调用recv()多次,直到遇到换行符’\n’
self.wfile:
wfile是file-like object,用于向客户端发送消息, 可写interface接口
write(b)方法:会调用sent()发送数据信息,参数b必须使用bytes-like object, 支持io.BufferedIOBase
创建一个服务器,BaseServer是服务器超类:定义接口,但无具体实现
TCPServer和UDPServer是服务器子类,具体实现了其超类BaseServer的接口,用户开发程序应使用这两个子类
所有服务器对象的超类,定义了很多interface但没有具体实现,这些接口的具体实现在subclasses中
•BaseServer.fileno():返回服务器监听套接字的整数文件描述符。通常用来传递给select.select(), 以允许一个进程监视多个服务器。
•BaseServer.handlerequest():
处理单个请求。处理顺序:getrequest(), verifyrequest(), processrequest()。如果用户提供handle()方法抛出异常,将调用服务器的handleerror()方法。如果self.timeout内没有请求收到, 将调用handletimeout()并返回handle_request()。
•BaseServer.serveforever(pollinterval=0.5): 处理请求,直到一个明确的shutdown()请求。每poll_interval秒轮询一次shutdown。忽略self.timeout。如果你需要做周期性的任务,建议放置在其他线程。
•BaseServer.shutdown():告诉serve_forever()循环停止并等待其停止。python2.6版本。
•BaseServer.addressfamily: 地址家族,比如socket.AFINET和socket.AF_UNIX。
•BaseServer.RequestHandlerClass:用户提供的请求处理类,这个类为每个请求创建实例。
•BaseServer.server_address:服务器侦听的地址。格式根据协议家族地址的各不相同,请参阅socket模块的文档。
•BaseServer.socketSocket:服务器上侦听传入的请求socket对象的服务器。
服务器类支持下面的类变量:
•BaseServer.allow_reuse_address :服务器是否允许地址的重用。默认为false ,并且可在子类中更改。
•BaseServer.request_queue_size
请求队列的大小。如果单个请求需要很长的时间来处理,服务器忙时请求被放置到队列中,最多可以放requestqueuesize个。一旦队列已满,来自客户端的请求将得到 “Connection denied”错误。默认值通常为5 ,但可以被子类覆盖。
•BaseServer.sockettype:服务器使用的套接字类型; socket.SOCKSTREAM和socket.SOCK_DGRAM等。
•BaseServer.timeout:超时时间,以秒为单位,或 None表示没有超时。如果handlerequest()在timeout内没有收到请求,将调用handletimeout()。
下面方法可以被子类重载,它们对服务器对象的外部用户没有影响。
•BaseServer.server_activate():通过服务器的构造函数来激活服务器。默认的行为只是监听服务器套接字。可重载。
•BaseServer.server_bind():通过服务器的构造函数中调用绑定socket到所需的地址。可重载。
该类创建的实例对象TCP服务器,该服务器是上下文管理器
- 使用Internet TCP协议,该协议在客户端和服务器之间提供连续的数据流
服务器:
#!/usr/bin/python3
from socketserver import TCPServer as TCP, StreamRequestHandler as SRH
from time import ctime
ADDR = 'localhost', 21568
class MyRequestHandler(SRH):
'''创建请求类,处理客户请求'''
def handle(self):
# 请求处理
print(f'...connected from:{self.client_address}')
print(self.request)
self.wfile.write(f'[{ctime()}] {self.rfile.readline()}'.encode('utf8') )
with TCP(ADDR, MyRequestHandler) as tcpServ: # 创建一个服务器实例对象
tcpServ.fileno()
print('waiting for connection')
tcpServ.serve_forever() # 运行服务器,直到shutdown()
客户端(不断创建新的连接):
from socket import *
ADDR = ('localhost', 21568)
while True:
tcpCliSock = socket(AF_INET, SOCK_STREAM)
tcpCliSock.connect(ADDR)
data = input('> ')
if not data:
break
c = tcpCliSock.send(f'{data}\r\n'.encode())
print(f'c={c}')
data = tcpCliSock.recv(1024)
print(f'data={data}')
if not data:
break
print(data.strip())
tcpCliSock.close()
客户端(仅创建一个连接)
from socket import *
ADDR = ('localhost', 21568)
tcpCliSock = socket(AF_INET, SOCK_STREAM)
tcpCliSock.connect(ADDR)
while True:
data = input('> ')
if not data:
break
c = tcpCliSock.send(f'{data}\r\n'.encode())
print(f'c={c}')
data = tcpCliSock.recv(1024)
print(f'data={data}')
if not data:
break
print(data.strip())
tcpCliSock.close()
ThreadingTCPServer一切的功能都从两个父类里继承,ThreadingMixIn为它提供了多线程能力,TCPServer为它提供基本的socket通信能力
ThreadingTCPServer实现的Soket服务器内部会为每个客户端创建一个线程,该线程用来和客户端进行交互。服务器相当于一个总管,在接收连接并创建新的线程后,就撒手不管了,后面的通信就是线程和客户端之间的连接了
使用步骤:
1、 定义请求处理器类,接受客户请求并处理
2、将这个类,连同服务器的ip和端口,作为参数传递给ThreadingTCPServer()构造器
3、手动启动ThreadingTCPServer。
#!/usr/bin/python3
from socketserver import BaseRequestHandler
from socketserver import ThreadingTCPServer
import socket
from time import ctime
import threading
ADDR = 'localhost', 21568
client_addr = []
client_socket = []
class ThreadedTCPRequestHandler(BaseRequestHandler):
'''继承超类,实现自定义setup、handle、finish方法
允许地址复用、长连接、超时断开
如果是继承StreamRequestHandler子类则无法自定义setup、finish方法'''
ip = ""
port = 0
timeOut = 6 # 设置超时时间变量
def setup(self):
self.ip = self.client_address[0].strip() # 获取客户端的ip
self.port = self.client_address[1] # 获取客户端的port
self.request.settimeout(self.timeOut) # 对socket设置超时时间
print(self.ip + ":" + str(self.port) + "连接到服务器!")
client_addr.append(self.client_address) # 保存到队列中
client_socket.append(self.request) # 保存套接字socket
def handle(self):
while True: # while循环
try:
data = self.request.recv(1024).decode('utf8')
except socket.timeout: # 如果接收超时会抛出socket.timeout异常
print(self.ip + ":" + str(self.port) + "接收超时!即将断开连接!")
break # 记得跳出while循环
if data: # 判断是否接收到数据
cur_thread = threading.current_thread()
response = f"{cur_thread.name}: {data}".encode('utf8')
self.request.sendall(response)
def finish(self):
print(self.ip + ":" + str(self.port) + "断开连接!")
client_addr.remove(self.client_address)
client_socket.remove(self.request)
ThreadingTCPServer.allow_reuse_address = True # 允许地址复用
with ThreadingTCPServer(ADDR, ThreadedTCPRequestHandler) as tcpServ:
print('waiting for connection')
# print(threading.current_thread())
tcpServ.serve_forever() # 运行服务器,直到shutdown()