当前位置: 首页 > 工具软件 > SocketServer > 使用案例 >

使用socketserver的ThreadingUDPServer、ThreadingTCPServer实现收发消息和文件

终睿
2023-12-01

本文章采用socketserver.ThreadingUDPServer、socketserver.ThreadingTCPServer实现UDP收发文本、TCP收发文件的功能。

只有一个工具类文件socketUtil.py

1.ThreadingUDPServer、ThreadingTCPServer是另起两个线程来同时运行的
2.UdpRequestHandler、TcpRequestHandler里的类方法Creator(cls, *args, **kwargs)是为了将外部类当做参数传入才需要的,便于执行完网络任务后调用外部类的其他函数以做出反应,比如ClassName.onNewDataArrive()。

# coding=utf-8
import os
import time
import uuid
import json
import threading
import socket
import socketserver


class SocketUtil(object):

    def __init__(self, god, udpPort=23333, tcpPort=23335):
        super(SocketUtil, self).__init__()
        self.god = god
        self.udpPort = udpPort  # 用于收发文本消息
        self.tcpPort = tcpPort  # 用于接收文件
        self.localIP = self.getLocalIP()
        # self.initUdpServerAndClient()
        # self.initTcpServerAndClient()

    def getLocalMacAddress(self):
        '''获得本机MAC地址'''
        mac = uuid.UUID(int=uuid.getnode()).hex[-12:]
        return "-".join([mac[e:e + 2] for e in range(0, 11, 2)])

    def getLocalHostName(self):
        '''获得本机计算机名称'''
        return socket.gethostname()

    def getLocalIP(self):
        '''获取本地主机IP'''
        return self.getIPByHostName(self.getLocalHostName())

    def getIPByHostName(self, remoteHostName):
        '''获取域名对应的IP'''
        ip = None
        try:
            ip = socket.gethostbyname(remoteHostName)
        except socket.error as e:
            print("getIPByHostName %s:%s" % (remoteHostName, e.value))
        finally:
            return ip

    def getHostnameByIP(self, ip):
        '''根据IP获取其计算机名称'''
        hostname = None
        try:
            # gethostbyaddr返回一个包含三个元素的元组(给定地址的主要的主机名、同一IP地址的可选的主机名的一个列表、
            # 关于同一主机的同一接口的其它IP地址的一个列表),比如:('Administrator', [], ['192.168.1.6'])
            hostname = socket.gethostbyaddr(ip)[0]
        except socket.herror as e:
            print("getHostnameByIP %s:%s" % (ip, e.value))
        finally:
            return hostname

    def ping(self, ip):
        '''ping指定IP,判断是否ping得通
        ping2次,等待时间为1s'''
        result = os.system('ping -n 2 -w 1 %s' % ip)
        if result:
            return False
        else:
            return True

    def multitask_thread(self, func, args=(), join=False, daemon=False, timeout=30):
        '''启动线程执行func
        func:要执行的方法对象
        args:传给func的元组类型的参数。回调函数可以定义为func(*arg)或func(arg1,arg2,...)
        kwargs:传给func的字典类型的参数
        join:主线程是否阻塞等待子线程结束。与daemon不能同时为True
        daemon:守护进程标志,表示主线程结束后子线程是否跟着结束。与join不能同时为True
        timeout:主线程等待子线程的最长秒数,join=True才生效'''
        # Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
        t = threading.Thread(target=func, args=args, daemon=daemon)
        t.start()
        if join and not daemon:
            t.join(timeout=timeout)

    def initUdpServerAndClient(self):
        self.udpClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.multitask_thread(self.runUdpServer, daemon=True)

    def runUdpServer(self):
        print('udp server start at:', self.localIP, self.udpPort)
        udpServer = socketserver.ThreadingUDPServer((self.localIP, self.udpPort), UdpRequestHandler.Creator(self.god))
        udpServer.serve_forever()

    def sendMsgByUdp(self, dt, targetIP, targetPort=None):
        '''使用udp发送消息,无需connect,直接通过主机ip和端口访问服务器
        dt格式:{'from': 'client', 'to': 'server', 'type': 'txt', 'msg': 'content'}
        dt = {'type': 'text', 'msg': '消息'}
        dt_dumps = json.dumps(dt)  # {"type": "text", "msg": "\u6d88\u606f"}
        dt_encode = dt_dumps.encode('utf-8')  # b'{"type": "text", "msg": "\\u6d88\\u606f"}'
        dt_encode = bytes(dt_dumps, encoding='utf-8')  # 转为字节流数据b'{"type": "text", "msg": "\\u6d88\\u606f"}'
        dt_decode = dt_encode.decode('utf-8')  # {"type": "text", "msg": "\u6d88\u606f"}
        dt_loads = json.loads(dt_decode)  # {'type': 'text', 'msg': '消息'}
        '''
        reply = (False, '发送失败。')
        try:
            if not targetPort:
                targetPort = self.udpPort  # 默认对方也使用与我方一致的端口
            if targetIP:
                dt_encode = json.dumps(dt).encode('utf-8')
                bytesSize = self.udpClient.sendto(dt_encode, (targetIP, targetPort))
                if bytesSize > 0:
                    reply = (True, '已发送%s字节的UDP数据。' % bytesSize)
                else:
                    reply = (False, '发送失败:%s' % bytesSize)
            else:
                reply = (False, '无效的IP地址:%s' % str(targetIP))
        except BaseException as e:
            reply = (False, str(e))
        finally:
            return reply

    def initTcpServerAndClient(self):
        self.tcpCilent = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.multitask_thread(self.runTcpServer, daemon=True)

    def runTcpServer(self):
        print('tcp server start at:', self.localIP, self.tcpPort)
        tcpServer = socketserver.ThreadingTCPServer((self.localIP, self.tcpPort), TcpRequestHandler.Creator(self.god))
        tcpServer.serve_forever()

    def sendFileByTcp(self, dt, targetIP, targetPort=None):
        reply = (False, '发送失败。')
        try:
            if not targetPort:
                targetPort = self.udpPort  # 默认对方也使用与我方一致的端口
            if targetIP:
                # self.tcpCilent.settimeout(self.timeout)  # 设置超时,单位是秒;设为None表示没有超时
                self.tcpCilent.connect((targetIP, targetPort))
                filePath = dt['msg']
                dt['msg'] = os.path.basename(filePath)  # 只传文件名
                dt['size'] = os.stat(filePath)[6]  # 文件大小(字节)
                dt_encode = json.dumps(dt).encode('utf-8')
                self.tcpClient.sendall(dt_encode)
                data = self.tcpClient.recv(self.buffSize)
                if data == b'ready':
                    print("开始发送文件:%s" % filePath)
                    totalBytesToSend = dt['size']
                    bytesSended = 0
                    f = open(filePath, 'rb')
                    while True:
                        data = f.read(self.buffSize)
                        if not data:
                            print("100%")
                            break
                        self.tcpClient.sendall(data)
                        bytesSended += self.buffSize
                        print('%.2f%%' % (bytesSended * 100 / totalBytesToSend))
                    f.close()
                    time.sleep(0.5)  # 如果不加这一行,EOF会与上一个buffSize的data拼接在一起
                    self.tcpClient.sendall(b'EOF')
                    print("文件发送完毕:%s" % filePath)
                    reply = (True, '文件发送完毕:%s' % filePath)
                else:
                    reply = (False, '对方拒绝接收文件。')
            else:
                reply = (False, '无效的IP地址:%s' % str(targetIP))
        except BaseException as e:
            reply = (False, str(e))
        finally:
            return reply


class UdpRequestHandler(socketserver.BaseRequestHandler):

    def __init__(self, request, client_address, server, god):
        self.god = god  # 这一句一定要放在super().__init__()之前
        super().__init__(request, client_address, server)

    @classmethod
    def Creator(cls, *args, **kwargs):
        def _HandlerCreator(request, client_address, server):
            cls(request, client_address, server, *args, **kwargs)
        return _HandlerCreator

    def handle(self):
        try:
            dt = json.loads(self.request[0].strip().decode('utf-8'))
            self.god.onNewDataArrive(dt)
        except BaseException as e:
            self.god.log.exception(repr(e))
            dt = {'type': 'error', 'msg': str(e)}
            self.god.onNewErrorArrive(dt)


class TcpRequestHandler(socketserver.BaseRequestHandler):

    buffSize = 32 * 1024  # 读写文件时的缓存大小。一个包的最大长度为2的16次方(65536)个字节,即64k,整型占4个字节,字符占1个字节
    # udp报文最大65536字节,1个自己记录报文长度、udp包头占8字节、ip包头占20自己,还剩65507可用于报文内容,约64K,报文内容最好限制在48K以下

    def __init__(self, request, client_address, server, god):
        self.god = god  # 这一句一定要放在super().__init__()之前
        super().__init__(request, client_address, server)

    @classmethod
    def Creator(cls, *args, **kwargs):
        def _HandlerCreator(request, client_address, server):
            cls(request, client_address, server, *args, **kwargs)
        return _HandlerCreator

    def setup(self):
        self.clientInfo = "客户端[%s:%s]" % self.client_address
        print("===%s已连接到TCP服务器。" % self.clientInfo)

    def finish(self):
        print("===%s已断开连接。" % self.clientInfo)

    def handle(self):
        try:
            dt = json.loads(self.request.recv(self.buffSize).decode('utf-8'))
            print('handle:', dt)
            if dt['type'] == 'file':
                self.recvfile(dt)
            else:
                print('无法识别的客户端请求:', dt)
        except BaseException as e:
            dt = {'type': 'error', 'msg': str(e)}
            self.god.onNewErrorArrive(dt)

    def recvfile(self, dt):
        '''totalBytesToReceive:要接收的数据的总大小'''
        totalBytesToReceive = dt['size']
        saveFilePath = os.path.join(self.god.dir_root_user, dt['msg'])
        self.request.send(b'ready')
        bytesReceived = 0  # 已接收的数据大小
        f = open(saveFilePath, 'wb')
        while True:
            data = self.request.recv(self.buffSize)
            if data == b'EOF':
                print("100%")
                break
            f.write(data)
            bytesReceived += len(data)
            print('%.2f%%' % (bytesReceived * 100 / totalBytesToReceive))
        f.flush()
        f.close()
        dt['msg'] = saveFilePath
        print("Server:文件接收完毕:", dt)
        self.god.onNewDataArrive(dt)

任何py文件都可以直接使用工具类SocketUtil,比如:

from socketUtil import SocketUtil


class ClassName(object):

    def __init__(self):
        super(ClassName, self).__init__()
        self.sku = socketUtil(self)

    def main(self):
        self.sku.initUdpServerAndClient()
        dt = {'from': 'client', 'to': 'server', 'type': 'txt', 'msg': 'content'}
        self.sku.sendMsgByUdp(dt, '192.168.1.2', targetPort=23333)

        self.sku.initTcpServerAndClient()
        dt = {'from': 'client', 'to': 'server', 'type': 'file', 'msg': 'D:\\Adobe Fireworks CS6 Ansifa绿色精简版.7z'}
        self.sku.sendFileByTcp(dt, '192.168.1.2', targetPort=23335)

    def onNewDataArrive(self, dt):
        print(dt)

    def onNewErrorArrive(self, dt):
        print(dt)


if __name__ == '__main__':
    c = ClassName()
    c.main()

 类似资料: