当前位置: 首页 > 工具软件 > SocketServer > 使用案例 >

socketserver模块使用

琴修为
2023-12-01

socketserver

socketserver在内部使用IO多路复用以及多线程/进程机制,实现了并发处理多个客户端请求的socket服务端。每个客户端请求连接到服务器时,socketserver服务端都会创建一个“线程”或者“进程” 专门负责处理当前客户端的所有请求

Request Handler Objects请求处理对象

处理器接收数据并决定如何操作。BaseRequestHandler是超类提供所有接口,其派生的子类实现这些接口的具体细节。
StreamRequestHandler和DatagramRequestHandler是BaseRequestHandler的子类,用户开发时应继承这两个子类中任一个,并重定义handle()方法

class socketserver.BaseRequestHandler

所有请求处理对象的超类。
它负责在socket层之上实现协议(i.e., HTTP, XML-RPC, or AMQP),读取数据,处理并写反应。可以重载的方法如下:

•setup():
在handle()之前的进行任何初始化操作. 默认什么都不做,继承BaseRequestHandler类时可重写setup方法
StreamRequestHandler中会创建文件类似的对象以读写socket, 因此继承StreamRequestHandler类时不能重写setup方法

•handle():
处理请求。解析传入的请求,处理数据,并发送响应。默认什么都不做。
该方法有几个实例属性可使用:

self.request:保存所有请求信息。
对于stream servers,request是socket object;对于datagram servers.request是(string , socket)
self.client_address:客户端地址
self.server:服务器实例对象,可调用服务器对象的所有方法和属性

•finish():
在handle()之后进行任何清理操作。默认什么都不做,如果setup产生异常,不会执行finish。

class socketserver.StreamRequestHandler 和 class socketserver.DatagramRequestHandler

请求处理器子类,重写了setup()和finish()方法,并提供self.rfile 和 self.wfile属性,可分别用于读取或写入,以获得请求数据或将数据返回给客户端

self.rfile:
rfile是file-like object,用于获取客户端消息,类型为io.BufferedReader(缓冲二进制流)
read()方法: 一直读取直到遇到EOF接受
readline():会调用recv()多次,直到遇到换行符’\n’

self.wfile:
wfile是file-like object,用于向客户端发送消息, 可写interface接口
write(b)方法:会调用sent()发送数据信息,参数b必须使用bytes-like object, 支持io.BufferedIOBase

服务器对象

创建一个服务器,BaseServer是服务器超类:定义接口,但无具体实现
TCPServer和UDPServer是服务器子类,具体实现了其超类BaseServer的接口,用户开发程序应使用这两个子类

1. class socketserver.BaseServer(server_address, RequestHandlerClass)

所有服务器对象的超类,定义了很多interface但没有具体实现,这些接口的具体实现在subclasses中

•BaseServer.fileno():返回服务器监听套接字的整数文件描述符。通常用来传递给select.select(), 以允许一个进程监视多个服务器。

•BaseServer.handlerequest():
处理单个请求。处理顺序:getrequest(), verifyrequest(), processrequest()。如果用户提供handle()方法抛出异常,将调用服务器的handleerror()方法。如果self.timeout内没有请求收到, 将调用handletimeout()并返回handle_request()。

•BaseServer.serveforever(pollinterval=0.5): 处理请求,直到一个明确的shutdown()请求。每poll_interval秒轮询一次shutdown。忽略self.timeout。如果你需要做周期性的任务,建议放置在其他线程。

•BaseServer.shutdown():告诉serve_forever()循环停止并等待其停止。python2.6版本。

•BaseServer.addressfamily: 地址家族,比如socket.AFINET和socket.AF_UNIX。

•BaseServer.RequestHandlerClass:用户提供的请求处理类,这个类为每个请求创建实例。

•BaseServer.server_address:服务器侦听的地址。格式根据协议家族地址的各不相同,请参阅socket模块的文档。

•BaseServer.socketSocket:服务器上侦听传入的请求socket对象的服务器。

服务器类支持下面的类变量:

•BaseServer.allow_reuse_address :服务器是否允许地址的重用。默认为false ,并且可在子类中更改。

•BaseServer.request_queue_size

请求队列的大小。如果单个请求需要很长的时间来处理,服务器忙时请求被放置到队列中,最多可以放requestqueuesize个。一旦队列已满,来自客户端的请求将得到 “Connection denied”错误。默认值通常为5 ,但可以被子类覆盖。

•BaseServer.sockettype:服务器使用的套接字类型; socket.SOCKSTREAM和socket.SOCK_DGRAM等。

•BaseServer.timeout:超时时间,以秒为单位,或 None表示没有超时。如果handlerequest()在timeout内没有收到请求,将调用handletimeout()。

下面方法可以被子类重载,它们对服务器对象的外部用户没有影响。

•BaseServer.server_activate():通过服务器的构造函数来激活服务器。默认的行为只是监听服务器套接字。可重载。

•BaseServer.server_bind():通过服务器的构造函数中调用绑定socket到所需的地址。可重载。

TCPServer(server_address, RequestHandlerClass, bind_and_activate=True)

该类创建的实例对象TCP服务器,该服务器是上下文管理器

  1. 使用Internet TCP协议,该协议在客户端和服务器之间提供连续的数据流
  1. server_address: 指定要监听的ip和端口,如:(‘localhost’, 21567)。该参数被传递给超类BaseServer
  2. 构造函数自动调用server_bind(): bind the socket to the server_address
  3. 构造函数自动调用server_activate(): activate服务器,tcp服务器的默认只调用listen(), 该行为可能被覆盖
  4. RequestHandlerClass:用户提供的请求处理器类;为每个请求创建此类的一个实例。该参数被传递给超类BaseServer
代码示范

服务器:

#!/usr/bin/python3

from socketserver import TCPServer as TCP, StreamRequestHandler as SRH
from time import ctime

ADDR = 'localhost', 21568

class MyRequestHandler(SRH):
    '''创建请求类,处理客户请求'''

    def handle(self):
        # 请求处理
        print(f'...connected from:{self.client_address}')
        print(self.request)
        self.wfile.write(f'[{ctime()}] {self.rfile.readline()}'.encode('utf8') )

with TCP(ADDR, MyRequestHandler) as tcpServ: # 创建一个服务器实例对象
    tcpServ.fileno()
    print('waiting for connection')
    tcpServ.serve_forever() # 运行服务器,直到shutdown()

客户端(不断创建新的连接):

from socket import *

ADDR = ('localhost', 21568)

while True:
    tcpCliSock = socket(AF_INET, SOCK_STREAM)
    tcpCliSock.connect(ADDR)
    data = input('> ')
    if not data:
        break
    c = tcpCliSock.send(f'{data}\r\n'.encode())
    print(f'c={c}')
    data = tcpCliSock.recv(1024)
    print(f'data={data}')
    if not data:
        break
    print(data.strip())
    tcpCliSock.close()

客户端(仅创建一个连接)

from socket import *

ADDR = ('localhost', 21568)

tcpCliSock = socket(AF_INET, SOCK_STREAM)
tcpCliSock.connect(ADDR)
while True:
    data = input('> ')
    if not data:
        break
    c = tcpCliSock.send(f'{data}\r\n'.encode())
    print(f'c={c}')
    data = tcpCliSock.recv(1024)
    print(f'data={data}')
    if not data:
        break
    print(data.strip())

tcpCliSock.close()

class ThreadingTCPServer(ThreadingMixIn, TCPServer):

ThreadingTCPServer一切的功能都从两个父类里继承,ThreadingMixIn为它提供了多线程能力,TCPServer为它提供基本的socket通信能力

ThreadingTCPServer实现的Soket服务器内部会为每个客户端创建一个线程,该线程用来和客户端进行交互。服务器相当于一个总管,在接收连接并创建新的线程后,就撒手不管了,后面的通信就是线程和客户端之间的连接了

使用步骤:
1、 定义请求处理器类,接受客户请求并处理
2、将这个类,连同服务器的ip和端口,作为参数传递给ThreadingTCPServer()构造器
3、手动启动ThreadingTCPServer。

示范代码
#!/usr/bin/python3

from socketserver import BaseRequestHandler
from socketserver import ThreadingTCPServer
import socket
from time import ctime
import threading

ADDR = 'localhost', 21568

client_addr = []
client_socket = []


class ThreadedTCPRequestHandler(BaseRequestHandler):
    '''继承超类,实现自定义setup、handle、finish方法
    允许地址复用、长连接、超时断开
    如果是继承StreamRequestHandler子类则无法自定义setup、finish方法'''

    ip = ""
    port = 0
    timeOut = 6     # 设置超时时间变量

    def setup(self):
        self.ip = self.client_address[0].strip()     # 获取客户端的ip
        self.port = self.client_address[1]           # 获取客户端的port
        self.request.settimeout(self.timeOut)        # 对socket设置超时时间
        print(self.ip + ":" + str(self.port) + "连接到服务器!")
        client_addr.append(self.client_address)  # 保存到队列中
        client_socket.append(self.request)      # 保存套接字socket

    def handle(self):
        while True:  # while循环
            try:
                data = self.request.recv(1024).decode('utf8')
            except socket.timeout:  # 如果接收超时会抛出socket.timeout异常
                print(self.ip + ":" + str(self.port) + "接收超时!即将断开连接!")
                break       # 记得跳出while循环

            if data:    # 判断是否接收到数据
                cur_thread = threading.current_thread()
                response = f"{cur_thread.name}: {data}".encode('utf8')
                self.request.sendall(response)

    def finish(self):
        print(self.ip + ":" + str(self.port) + "断开连接!")
        client_addr.remove(self.client_address)
        client_socket.remove(self.request)


ThreadingTCPServer.allow_reuse_address = True # 允许地址复用

with ThreadingTCPServer(ADDR, ThreadedTCPRequestHandler) as tcpServ:
    print('waiting for connection')
    # print(threading.current_thread())
    tcpServ.serve_forever()  # 运行服务器,直到shutdown()
 类似资料: