http.server — 实现 Web Servers 的基础工具
优质
小牛编辑
123浏览
2023-12-01
HTTP GET
# http_server_GET.py
from http.server import BaseHTTPRequestHandler
from urllib import parse
class GetHandler(BaseHTTPRequestHandler):
def do_GET(self):
parsed_path = parse.urlparse(self.path)
message_parts = [
'CLIENT VALUES:',
'client_address={} ({})'.format(
self.client_address,
self.address_string()),
'command={}'.format(self.command),
'path={}'.format(self.path),
'real path={}'.format(parsed_path.path),
'query={}'.format(parsed_path.query),
'request_version={}'.format(self.request_version),
'',
'SERVER VALUES:',
'server_version={}'.format(self.server_version),
'sys_version={}'.format(self.sys_version),
'protocol_version={}'.format(self.protocol_version),
'',
'HEADERS RECEIVED:',
]
for name, value in sorted(self.headers.items()):
message_parts.append(
'{}={}'.format(name, value.rstrip())
)
message_parts.append('')
message = '\r\n'.join(message_parts)
self.send_response(200)
self.send_header('Content-Type',
'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(message.encode('utf-8'))
if __name__ == '__main__':
from http.server import HTTPServer
server = HTTPServer(('localhost', 8080), GetHandler)
print('Starting server, use <Ctrl-C> to stop')
server.serve_forever()
HTTP POST
# http_server_POST.py
import cgi
from http.server import BaseHTTPRequestHandler
import io
class PostHandler(BaseHTTPRequestHandler):
def do_POST(self):
# Parse the form data posted
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type'],
}
)
# Begin the response
self.send_response(200)
self.send_header('Content-Type',
'text/plain; charset=utf-8')
self.end_headers()
out = io.TextIOWrapper(
self.wfile,
encoding='utf-8',
line_buffering=False,
write_through=True,
)
out.write('Client: {}\n'.format(self.client_address))
out.write('User-agent: {}\n'.format(
self.headers['user-agent']))
out.write('Path: {}\n'.format(self.path))
out.write('Form data:\n')
# Echo back information about what was posted in the form
for field in form.keys():
field_item = form[field]
if field_item.filename:
# The field contains an uploaded file
file_data = field_item.file.read()
file_len = len(file_data)
del file_data
out.write(
'\tUploaded {} as {!r} ({} bytes)\n'.format(
field, field_item.filename, file_len)
)
else:
# Regular form value
out.write('\t{}={}\n'.format(
field, form[field].value))
# Disconnect our encoding wrapper from the underlying
# buffer so that deleting the wrapper doesn't close
# the socket, which is still being used by the server.
out.detach()
if __name__ == '__main__':
from http.server import HTTPServer
server = HTTPServer(('localhost', 8080), PostHandler)
print('Starting server, use <Ctrl-C> to stop')
server.serve_forever()
Threading and Forking
# http_server_threads.py
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
import threading
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-Type',
'text/plain; charset=utf-8')
self.end_headers()
message = threading.currentThread().getName()
self.wfile.write(message.encode('utf-8'))
self.wfile.write(b'\n')
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""Handle requests in a separate thread."""
if __name__ == '__main__':
server = ThreadedHTTPServer(('localhost', 8080), Handler)
print('Starting server, use <Ctrl-C> to stop')
server.serve_forever()
错误处理
# http_server_errors.py
from http.server import BaseHTTPRequestHandler
class ErrorHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_error(404)
if __name__ == '__main__':
from http.server import HTTPServer
server = HTTPServer(('localhost', 8080), ErrorHandler)
print('Starting server, use <Ctrl-C> to stop')
server.serve_forever()
设置头
# http_server_send_header.py
from http.server import BaseHTTPRequestHandler
import time
class GetHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header(
'Content-Type',
'text/plain; charset=utf-8',
)
self.send_header(
'Last-Modified',
self.date_time_string(time.time())
)
self.end_headers()
self.wfile.write('Response body\n'.encode('utf-8'))
if __name__ == '__main__':
from http.server import HTTPServer
server = HTTPServer(('localhost', 8080), GetHandler)
print('Starting server, use <Ctrl-C> to stop')
server.serve_forever()