ESP8266上用MicroPython写的WebServer

宗沛
2023-12-01

基本功能都有了,另外还实现了基本身份验证、 session 和 cookie,类似cgi的动态页

单连接工作,跑的不好。。。。可能cpu不强吧,懒得弄了,一起打包丢上来点击打开链接

import socket,time,os

Response=None
Request={}
Session={}
Cookie={}

class CResponse:
  def __init__(self,f):
    self.status=200
    self.cs=f
    self.Header={'Server':'MMPWS/0.1','Connection':'close'}#mini MicroPython Web Server 0.1
    self.s=False

  def SendHeader(self):
    global Cookie
    global Request
    ts={'200':'OK','301':'Moved Permanently','302':'Found','304':'Not Modified','401':'Unauthorized','404':'Not Found','500':'Internal Server Error'}
    if not self.s:
      xs=str(self.status)
      if xs in ts:
        self.__w('HTTP/1.1 '+xs+' '+ts[xs]+'\r\n')
      for(key,val)in self.Header.items():
        self.__w(key+': '+val+'\r\n')
      if self.status==200:
        for(key,val)in Cookie.items():
          if key in Request['Cookie']:
            if Request['Cookie'][key]!=val:
              self.__w('Set-Cookie: '+key+'='+val+';\r\n')
          elif key not in Request['Cookie']:
            self.__w('Set-Cookie: '+key+'='+val+';\r\n')
      self.__w('\r\n')
      self.s=True

  def __w(self,data):
    try:
      self.cs.write(data)
    except BaseException:
      pass

  def write(self,data):
    self.SendHeader()
    self.__w(data)

  def end(self):
    self.SendHeader()
    self.cs.close()

class WebServer:
  def __init__(self):
    self.ContentType={
      'htm':'text/html',
      'ico':'image/x-icon',
      'css':'text/css',
      'js':'application/javascript',
      'jpg':'image/jpeg',
      'gif':'image/gif',
      'png':'image/png',
      'txt':'text/plain'
    }
    self.DefaultDoc=('index.py','index.htm')
    self.AuthBasic=('','')
    self.__ss={} # Sessions

  def parse(self,istr,ch):
    result={}
    items=istr.split(ch)
    for item in items:
      if '=' in item:
        key,val=item.split('=',1)
      else:
        key=item
        val=''
      if key.strip() in result:
        if not isinstance(result[key.strip()],list):
          result[key.strip()]=[result[key.strip()]]
        result[key.strip()].append(val.strip())
      else:
        result[key.strip()]=val.strip()
    return result

  def translate_path(self,ipath,iroot):
    root=iroot
    path=ipath.split('?',1)[0]
    if root[-1:]=='/':
      root=root[:-1]
    path=path.split('#',1)[0]
    path=root+path
    if path[-1:]=='/':
      filename=path
      path=path[:-1]
    else:
      filename=''
    try:
      if os.stat(path)[0]==32768:
        filename=path
      elif len(filename)>0:
        filename=''
      else:
        filename=path+'/'
    except OSError:
      path=''
    if(len(filename)==0)and(len(path)>0):
      for fn in self.DefaultDoc:
        if fn in os.listdir(path):
          filename=path+'/'+fn
          break
    return filename

  def parse_request(self,cs,root):
    global Request
    while True:
      line=cs.readline()
      if(not line)or(line==b'\r\n'):
        if Request['method']=='POST':
          cl=int(Request['Header']['Content-Length'])
          Request['POST']=self.parse(str(cs.read(cl),'utf8'),'&')
        break
      head=str(line,'utf8').split(' ')
      if head[0] in('GET','POST'):
        Request['method']=head[0]
        if '?' in head[1]:
          Request['GET']=self.parse(head[1].split('?',1)[1],'&')
        Request['file']=self.translate_path(head[1],root)
      elif ':' in line:
        key,val=str(line,'utf8').split(':',1)
        Request['Header'][key.strip()]=val.strip()
    if 'Cookie' in Request['Header']:
      Request['Cookie']=self.parse(Request['Header']['Cookie'],';')
      Request['Header']['Cookie']=''

  def run(self,root,port,fun_idle):
    global Response
    global Request
    global Session
    global Cookie
    st=socket.socket()
    st.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    st.bind(socket.getaddrinfo('0.0.0.0', port)[0][-1])
    st.settimeout(0.3)
    st.listen(1)
    while True:
      ErrStr=''
      for key in self.__ss.keys():
        if 'Expires' in self.__ss[key]:
          if self.__ss[key]['Expires']<time.time():
            del self.__ss[key]
      if not fun_idle(self):
        break
      try:
        cs,ca=st.accept()
        Request={'method':'','file':'','RemoteIP':ca[0],'Header':{},'GET':{},'POST':{},'Cookie':{}}
        self.parse_request(cs,root)
        Response=CResponse(cs)
      except OSError:
        continue
      sid=''
      if len(self.AuthBasic[1])>0:
        Response.Header['WWW-Authenticate']='Basic realm="'+self.AuthBasic[0]+'"'
        if 'Authorization' not in Request['Header']:
          Response.status=401
        elif Request['Header']['Authorization']!='Basic '+self.AuthBasic[1]:
          Response.status=401
      if Request['file'][-1:]=='/':
        Response.status=301
        Response.Header['Location']=Request['file'][len(root):]
      else:
        try:
          if os.stat(Request['file'])[0]==16384:
            Response.status=404
        except OSError:
          Response.status=404        
      if Response.status==200:
        ext='htm'
        if len(Request['file'])>0:
          if('.' in Request['file'].rsplit('/',1)[1]):
            ext=Request['file'].rsplit('/',1)[1].rsplit('.',1)[1].lower()
        if ext in self.ContentType:
          Response.Header['Content-Type']=self.ContentType[ext]
        if 'SESSIONID' in Request['Cookie']:
          sid=Request['Cookie']['SESSIONID']
          if isinstance(sid,list):
            sid=sid[0]
        if sid not in self.__ss:
          sid=''.join([('0'+hex(ord(os.urandom(1)))[2:])[-2:] for tint in range(16)])
          self.__ss[sid]={'Id':sid,'Expires':time.time()+1200}# Session 有效时间默认 20分钟
          Cookie['SESSIONID']=sid
        else:
          self.__ss[sid]['Expires']=time.time()+1200
        Session=self.__ss[sid]
        if ext=='py':
          try:
            exec(open(Request['file']).read(),globals())
          except SystemExit as err:
            if len(err.args)>0:
              break
          except BaseException as err:
            Response.status=500
            ErrStr=''.join([ tstr+'\n' for tstr in err.args])
        else:
          if ext in ('css','js','jpg','gif','htm','html','ico','png'):
            Response.Header['Cache-Control']='public, max-age=31536000'
          ext='"'+str(Request['file'].rsplit('/',1)[1])+','+str(os.stat(Request['file'])[6])+'"'
          Response.Header['ETag']=ext
          if 'If-None-Match' in Request['Header']:
            if Request['Header']['If-None-Match']==ext:
              Response.status=304
          if Response.status==200:
            Response.Header['Content-Length']=str(os.stat(Request['file'])[6])
            tfile=open(Request['file'],'rb')
            while True:
              data=tfile.read(256)
              if len(data)>0:
                Response.write(data)
              else:
                tfile.close()
                break
      else:
        Response.Header['Content-Type']='text/html'
      tstr=str(Response.status)
      tdic={'301':'Moved Permanently','302':'Found','304':'Not Modified','401':'Unauthorized','404':'Not Found','500':'Internal Server Error'}
      if tstr in tdic:
        Response.write(tstr+' , '+tdic[tstr])
      if Response.status==500:
        Response.write('<br>'+ErrStr+'<br>')
      Response.end()
      del Response
      cs.close()
      del cs
      Response=None
      Request={}
      Session={}
      Cookie={}
    st.close()
    del st
    self.__ss={}

 类似资料: