基本功能都有了,另外还实现了基本身份验证、 session 和 cookie,类似cgi的动态页
单连接工作,跑的不好。。。。可能cpu不强吧,懒得弄了,一起打包丢上来点击打开链接
import socket,time,os
Response=None
Request={}
Session={}
Cookie={}
class CResponse:
def __init__(self,f):
self.status=200
self.cs=f
self.Header={'Server':'MMPWS/0.1','Connection':'close'}#mini MicroPython Web Server 0.1
self.s=False
def SendHeader(self):
global Cookie
global Request
ts={'200':'OK','301':'Moved Permanently','302':'Found','304':'Not Modified','401':'Unauthorized','404':'Not Found','500':'Internal Server Error'}
if not self.s:
xs=str(self.status)
if xs in ts:
self.__w('HTTP/1.1 '+xs+' '+ts[xs]+'\r\n')
for(key,val)in self.Header.items():
self.__w(key+': '+val+'\r\n')
if self.status==200:
for(key,val)in Cookie.items():
if key in Request['Cookie']:
if Request['Cookie'][key]!=val:
self.__w('Set-Cookie: '+key+'='+val+';\r\n')
elif key not in Request['Cookie']:
self.__w('Set-Cookie: '+key+'='+val+';\r\n')
self.__w('\r\n')
self.s=True
def __w(self,data):
try:
self.cs.write(data)
except BaseException:
pass
def write(self,data):
self.SendHeader()
self.__w(data)
def end(self):
self.SendHeader()
self.cs.close()
class WebServer:
def __init__(self):
self.ContentType={
'htm':'text/html',
'ico':'image/x-icon',
'css':'text/css',
'js':'application/javascript',
'jpg':'image/jpeg',
'gif':'image/gif',
'png':'image/png',
'txt':'text/plain'
}
self.DefaultDoc=('index.py','index.htm')
self.AuthBasic=('','')
self.__ss={} # Sessions
def parse(self,istr,ch):
result={}
items=istr.split(ch)
for item in items:
if '=' in item:
key,val=item.split('=',1)
else:
key=item
val=''
if key.strip() in result:
if not isinstance(result[key.strip()],list):
result[key.strip()]=[result[key.strip()]]
result[key.strip()].append(val.strip())
else:
result[key.strip()]=val.strip()
return result
def translate_path(self,ipath,iroot):
root=iroot
path=ipath.split('?',1)[0]
if root[-1:]=='/':
root=root[:-1]
path=path.split('#',1)[0]
path=root+path
if path[-1:]=='/':
filename=path
path=path[:-1]
else:
filename=''
try:
if os.stat(path)[0]==32768:
filename=path
elif len(filename)>0:
filename=''
else:
filename=path+'/'
except OSError:
path=''
if(len(filename)==0)and(len(path)>0):
for fn in self.DefaultDoc:
if fn in os.listdir(path):
filename=path+'/'+fn
break
return filename
def parse_request(self,cs,root):
global Request
while True:
line=cs.readline()
if(not line)or(line==b'\r\n'):
if Request['method']=='POST':
cl=int(Request['Header']['Content-Length'])
Request['POST']=self.parse(str(cs.read(cl),'utf8'),'&')
break
head=str(line,'utf8').split(' ')
if head[0] in('GET','POST'):
Request['method']=head[0]
if '?' in head[1]:
Request['GET']=self.parse(head[1].split('?',1)[1],'&')
Request['file']=self.translate_path(head[1],root)
elif ':' in line:
key,val=str(line,'utf8').split(':',1)
Request['Header'][key.strip()]=val.strip()
if 'Cookie' in Request['Header']:
Request['Cookie']=self.parse(Request['Header']['Cookie'],';')
Request['Header']['Cookie']=''
def run(self,root,port,fun_idle):
global Response
global Request
global Session
global Cookie
st=socket.socket()
st.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
st.bind(socket.getaddrinfo('0.0.0.0', port)[0][-1])
st.settimeout(0.3)
st.listen(1)
while True:
ErrStr=''
for key in self.__ss.keys():
if 'Expires' in self.__ss[key]:
if self.__ss[key]['Expires']<time.time():
del self.__ss[key]
if not fun_idle(self):
break
try:
cs,ca=st.accept()
Request={'method':'','file':'','RemoteIP':ca[0],'Header':{},'GET':{},'POST':{},'Cookie':{}}
self.parse_request(cs,root)
Response=CResponse(cs)
except OSError:
continue
sid=''
if len(self.AuthBasic[1])>0:
Response.Header['WWW-Authenticate']='Basic realm="'+self.AuthBasic[0]+'"'
if 'Authorization' not in Request['Header']:
Response.status=401
elif Request['Header']['Authorization']!='Basic '+self.AuthBasic[1]:
Response.status=401
if Request['file'][-1:]=='/':
Response.status=301
Response.Header['Location']=Request['file'][len(root):]
else:
try:
if os.stat(Request['file'])[0]==16384:
Response.status=404
except OSError:
Response.status=404
if Response.status==200:
ext='htm'
if len(Request['file'])>0:
if('.' in Request['file'].rsplit('/',1)[1]):
ext=Request['file'].rsplit('/',1)[1].rsplit('.',1)[1].lower()
if ext in self.ContentType:
Response.Header['Content-Type']=self.ContentType[ext]
if 'SESSIONID' in Request['Cookie']:
sid=Request['Cookie']['SESSIONID']
if isinstance(sid,list):
sid=sid[0]
if sid not in self.__ss:
sid=''.join([('0'+hex(ord(os.urandom(1)))[2:])[-2:] for tint in range(16)])
self.__ss[sid]={'Id':sid,'Expires':time.time()+1200}# Session 有效时间默认 20分钟
Cookie['SESSIONID']=sid
else:
self.__ss[sid]['Expires']=time.time()+1200
Session=self.__ss[sid]
if ext=='py':
try:
exec(open(Request['file']).read(),globals())
except SystemExit as err:
if len(err.args)>0:
break
except BaseException as err:
Response.status=500
ErrStr=''.join([ tstr+'\n' for tstr in err.args])
else:
if ext in ('css','js','jpg','gif','htm','html','ico','png'):
Response.Header['Cache-Control']='public, max-age=31536000'
ext='"'+str(Request['file'].rsplit('/',1)[1])+','+str(os.stat(Request['file'])[6])+'"'
Response.Header['ETag']=ext
if 'If-None-Match' in Request['Header']:
if Request['Header']['If-None-Match']==ext:
Response.status=304
if Response.status==200:
Response.Header['Content-Length']=str(os.stat(Request['file'])[6])
tfile=open(Request['file'],'rb')
while True:
data=tfile.read(256)
if len(data)>0:
Response.write(data)
else:
tfile.close()
break
else:
Response.Header['Content-Type']='text/html'
tstr=str(Response.status)
tdic={'301':'Moved Permanently','302':'Found','304':'Not Modified','401':'Unauthorized','404':'Not Found','500':'Internal Server Error'}
if tstr in tdic:
Response.write(tstr+' , '+tdic[tstr])
if Response.status==500:
Response.write('<br>'+ErrStr+'<br>')
Response.end()
del Response
cs.close()
del cs
Response=None
Request={}
Session={}
Cookie={}
st.close()
del st
self.__ss={}