本文章采用socketserver.ThreadingUDPServer、socketserver.ThreadingTCPServer实现UDP收发文本、TCP收发文件的功能。
只有一个工具类文件socketUtil.py
1.ThreadingUDPServer、ThreadingTCPServer是另起两个线程来同时运行的
2.UdpRequestHandler、TcpRequestHandler里的类方法Creator(cls, *args, **kwargs)是为了将外部类当做参数传入才需要的,便于执行完网络任务后调用外部类的其他函数以做出反应,比如ClassName.onNewDataArrive()。
# coding=utf-8
import os
import time
import uuid
import json
import threading
import socket
import socketserver
class SocketUtil(object):
def __init__(self, god, udpPort=23333, tcpPort=23335):
super(SocketUtil, self).__init__()
self.god = god
self.udpPort = udpPort # 用于收发文本消息
self.tcpPort = tcpPort # 用于接收文件
self.localIP = self.getLocalIP()
# self.initUdpServerAndClient()
# self.initTcpServerAndClient()
def getLocalMacAddress(self):
'''获得本机MAC地址'''
mac = uuid.UUID(int=uuid.getnode()).hex[-12:]
return "-".join([mac[e:e + 2] for e in range(0, 11, 2)])
def getLocalHostName(self):
'''获得本机计算机名称'''
return socket.gethostname()
def getLocalIP(self):
'''获取本地主机IP'''
return self.getIPByHostName(self.getLocalHostName())
def getIPByHostName(self, remoteHostName):
'''获取域名对应的IP'''
ip = None
try:
ip = socket.gethostbyname(remoteHostName)
except socket.error as e:
print("getIPByHostName %s:%s" % (remoteHostName, e.value))
finally:
return ip
def getHostnameByIP(self, ip):
'''根据IP获取其计算机名称'''
hostname = None
try:
# gethostbyaddr返回一个包含三个元素的元组(给定地址的主要的主机名、同一IP地址的可选的主机名的一个列表、
# 关于同一主机的同一接口的其它IP地址的一个列表),比如:('Administrator', [], ['192.168.1.6'])
hostname = socket.gethostbyaddr(ip)[0]
except socket.herror as e:
print("getHostnameByIP %s:%s" % (ip, e.value))
finally:
return hostname
def ping(self, ip):
'''ping指定IP,判断是否ping得通
ping2次,等待时间为1s'''
result = os.system('ping -n 2 -w 1 %s' % ip)
if result:
return False
else:
return True
def multitask_thread(self, func, args=(), join=False, daemon=False, timeout=30):
'''启动线程执行func
func:要执行的方法对象
args:传给func的元组类型的参数。回调函数可以定义为func(*arg)或func(arg1,arg2,...)
kwargs:传给func的字典类型的参数
join:主线程是否阻塞等待子线程结束。与daemon不能同时为True
daemon:守护进程标志,表示主线程结束后子线程是否跟着结束。与join不能同时为True
timeout:主线程等待子线程的最长秒数,join=True才生效'''
# Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
t = threading.Thread(target=func, args=args, daemon=daemon)
t.start()
if join and not daemon:
t.join(timeout=timeout)
def initUdpServerAndClient(self):
self.udpClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.multitask_thread(self.runUdpServer, daemon=True)
def runUdpServer(self):
print('udp server start at:', self.localIP, self.udpPort)
udpServer = socketserver.ThreadingUDPServer((self.localIP, self.udpPort), UdpRequestHandler.Creator(self.god))
udpServer.serve_forever()
def sendMsgByUdp(self, dt, targetIP, targetPort=None):
'''使用udp发送消息,无需connect,直接通过主机ip和端口访问服务器
dt格式:{'from': 'client', 'to': 'server', 'type': 'txt', 'msg': 'content'}
dt = {'type': 'text', 'msg': '消息'}
dt_dumps = json.dumps(dt) # {"type": "text", "msg": "\u6d88\u606f"}
dt_encode = dt_dumps.encode('utf-8') # b'{"type": "text", "msg": "\\u6d88\\u606f"}'
dt_encode = bytes(dt_dumps, encoding='utf-8') # 转为字节流数据b'{"type": "text", "msg": "\\u6d88\\u606f"}'
dt_decode = dt_encode.decode('utf-8') # {"type": "text", "msg": "\u6d88\u606f"}
dt_loads = json.loads(dt_decode) # {'type': 'text', 'msg': '消息'}
'''
reply = (False, '发送失败。')
try:
if not targetPort:
targetPort = self.udpPort # 默认对方也使用与我方一致的端口
if targetIP:
dt_encode = json.dumps(dt).encode('utf-8')
bytesSize = self.udpClient.sendto(dt_encode, (targetIP, targetPort))
if bytesSize > 0:
reply = (True, '已发送%s字节的UDP数据。' % bytesSize)
else:
reply = (False, '发送失败:%s' % bytesSize)
else:
reply = (False, '无效的IP地址:%s' % str(targetIP))
except BaseException as e:
reply = (False, str(e))
finally:
return reply
def initTcpServerAndClient(self):
self.tcpCilent = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.multitask_thread(self.runTcpServer, daemon=True)
def runTcpServer(self):
print('tcp server start at:', self.localIP, self.tcpPort)
tcpServer = socketserver.ThreadingTCPServer((self.localIP, self.tcpPort), TcpRequestHandler.Creator(self.god))
tcpServer.serve_forever()
def sendFileByTcp(self, dt, targetIP, targetPort=None):
reply = (False, '发送失败。')
try:
if not targetPort:
targetPort = self.udpPort # 默认对方也使用与我方一致的端口
if targetIP:
# self.tcpCilent.settimeout(self.timeout) # 设置超时,单位是秒;设为None表示没有超时
self.tcpCilent.connect((targetIP, targetPort))
filePath = dt['msg']
dt['msg'] = os.path.basename(filePath) # 只传文件名
dt['size'] = os.stat(filePath)[6] # 文件大小(字节)
dt_encode = json.dumps(dt).encode('utf-8')
self.tcpClient.sendall(dt_encode)
data = self.tcpClient.recv(self.buffSize)
if data == b'ready':
print("开始发送文件:%s" % filePath)
totalBytesToSend = dt['size']
bytesSended = 0
f = open(filePath, 'rb')
while True:
data = f.read(self.buffSize)
if not data:
print("100%")
break
self.tcpClient.sendall(data)
bytesSended += self.buffSize
print('%.2f%%' % (bytesSended * 100 / totalBytesToSend))
f.close()
time.sleep(0.5) # 如果不加这一行,EOF会与上一个buffSize的data拼接在一起
self.tcpClient.sendall(b'EOF')
print("文件发送完毕:%s" % filePath)
reply = (True, '文件发送完毕:%s' % filePath)
else:
reply = (False, '对方拒绝接收文件。')
else:
reply = (False, '无效的IP地址:%s' % str(targetIP))
except BaseException as e:
reply = (False, str(e))
finally:
return reply
class UdpRequestHandler(socketserver.BaseRequestHandler):
def __init__(self, request, client_address, server, god):
self.god = god # 这一句一定要放在super().__init__()之前
super().__init__(request, client_address, server)
@classmethod
def Creator(cls, *args, **kwargs):
def _HandlerCreator(request, client_address, server):
cls(request, client_address, server, *args, **kwargs)
return _HandlerCreator
def handle(self):
try:
dt = json.loads(self.request[0].strip().decode('utf-8'))
self.god.onNewDataArrive(dt)
except BaseException as e:
self.god.log.exception(repr(e))
dt = {'type': 'error', 'msg': str(e)}
self.god.onNewErrorArrive(dt)
class TcpRequestHandler(socketserver.BaseRequestHandler):
buffSize = 32 * 1024 # 读写文件时的缓存大小。一个包的最大长度为2的16次方(65536)个字节,即64k,整型占4个字节,字符占1个字节
# udp报文最大65536字节,1个自己记录报文长度、udp包头占8字节、ip包头占20自己,还剩65507可用于报文内容,约64K,报文内容最好限制在48K以下
def __init__(self, request, client_address, server, god):
self.god = god # 这一句一定要放在super().__init__()之前
super().__init__(request, client_address, server)
@classmethod
def Creator(cls, *args, **kwargs):
def _HandlerCreator(request, client_address, server):
cls(request, client_address, server, *args, **kwargs)
return _HandlerCreator
def setup(self):
self.clientInfo = "客户端[%s:%s]" % self.client_address
print("===%s已连接到TCP服务器。" % self.clientInfo)
def finish(self):
print("===%s已断开连接。" % self.clientInfo)
def handle(self):
try:
dt = json.loads(self.request.recv(self.buffSize).decode('utf-8'))
print('handle:', dt)
if dt['type'] == 'file':
self.recvfile(dt)
else:
print('无法识别的客户端请求:', dt)
except BaseException as e:
dt = {'type': 'error', 'msg': str(e)}
self.god.onNewErrorArrive(dt)
def recvfile(self, dt):
'''totalBytesToReceive:要接收的数据的总大小'''
totalBytesToReceive = dt['size']
saveFilePath = os.path.join(self.god.dir_root_user, dt['msg'])
self.request.send(b'ready')
bytesReceived = 0 # 已接收的数据大小
f = open(saveFilePath, 'wb')
while True:
data = self.request.recv(self.buffSize)
if data == b'EOF':
print("100%")
break
f.write(data)
bytesReceived += len(data)
print('%.2f%%' % (bytesReceived * 100 / totalBytesToReceive))
f.flush()
f.close()
dt['msg'] = saveFilePath
print("Server:文件接收完毕:", dt)
self.god.onNewDataArrive(dt)
任何py文件都可以直接使用工具类SocketUtil,比如:
from socketUtil import SocketUtil
class ClassName(object):
def __init__(self):
super(ClassName, self).__init__()
self.sku = socketUtil(self)
def main(self):
self.sku.initUdpServerAndClient()
dt = {'from': 'client', 'to': 'server', 'type': 'txt', 'msg': 'content'}
self.sku.sendMsgByUdp(dt, '192.168.1.2', targetPort=23333)
self.sku.initTcpServerAndClient()
dt = {'from': 'client', 'to': 'server', 'type': 'file', 'msg': 'D:\\Adobe Fireworks CS6 Ansifa绿色精简版.7z'}
self.sku.sendFileByTcp(dt, '192.168.1.2', targetPort=23335)
def onNewDataArrive(self, dt):
print(dt)
def onNewErrorArrive(self, dt):
print(dt)
if __name__ == '__main__':
c = ClassName()
c.main()