xmlrpc.server — XML-RPC 的服务端库
优质
小牛编辑
126浏览
2023-12-01
简单服务
# xmlrpc_function.py
from xmlrpc.server import SimpleXMLRPCServer
import logging
import os
# Set up logging
logging.basicConfig(level=logging.INFO)
server = SimpleXMLRPCServer(
('localhost', 9000),
logRequests=True,
)
# Expose a function
def list_contents(dir_name):
logging.info('list_contents(%s)', dir_name)
return os.listdir(dir_name)
server.register_function(list_contents)
# Start the server
try:
print('Use Control-C to exit')
server.serve_forever()
except KeyboardInterrupt:
print('Exiting')
# xmlrpc_function_client.py
import xmlrpc.client
proxy = xmlrpc.client.ServerProxy('http://localhost:9000')
print(proxy.list_contents('/tmp'))
Alternate API Names
# xmlrpc_alternate_name.py
from xmlrpc.server import SimpleXMLRPCServer
import os
server = SimpleXMLRPCServer(('localhost', 9000))
def list_contents(dir_name):
"Expose a function with an alternate name"
return os.listdir(dir_name)
server.register_function(list_contents, 'dir')
try:
print('Use Control-C to exit')
server.serve_forever()
except KeyboardInterrupt:
print('Exiting')
# xmlrpc_alternate_name_client.py
import xmlrpc.client
proxy = xmlrpc.client.ServerProxy('http://localhost:9000')
print('dir():', proxy.dir('/tmp'))
try:
print('\nlist_contents():', proxy.list_contents('/tmp'))
except xmlrpc.client.Fault as err:
print('\nERROR:', err)
Dotted API Names
# xmlrpc_dotted_name.py
from xmlrpc.server import SimpleXMLRPCServer
import os
server = SimpleXMLRPCServer(('localhost', 9000), allow_none=True)
server.register_function(os.listdir, 'dir.list')
server.register_function(os.mkdir, 'dir.create')
server.register_function(os.rmdir, 'dir.remove')
try:
print('Use Control-C to exit')
server.serve_forever()
except KeyboardInterrupt:
print('Exiting')
# xmlrpc_dotted_name_client.py
import xmlrpc.client
proxy = xmlrpc.client.ServerProxy('http://localhost:9000')
print('BEFORE :', 'EXAMPLE' in proxy.dir.list('/tmp'))
print('CREATE :', proxy.dir.create('/tmp/EXAMPLE'))
print('SHOULD EXIST :', 'EXAMPLE' in proxy.dir.list('/tmp'))
print('REMOVE :', proxy.dir.remove('/tmp/EXAMPLE'))
print('AFTER :', 'EXAMPLE' in proxy.dir.list('/tmp'))
Arbitrary API Names
# xmlrpc_arbitrary_name.py
from xmlrpc.server import SimpleXMLRPCServer
server = SimpleXMLRPCServer(('localhost', 9000))
def my_function(a, b):
return a * b
server.register_function(my_function, 'multiply args')
try:
print('Use Control-C to exit')
server.serve_forever()
except KeyboardInterrupt:
print('Exiting')
# xmlrpc_arbitrary_name_client.py
import xmlrpc.client
proxy = xmlrpc.client.ServerProxy('http://localhost:9000')
print(getattr(proxy, 'multiply args')(5, 5))
暴露对象的方法
# xmlrpc_instance.py
from xmlrpc.server import SimpleXMLRPCServer
import os
import inspect
server = SimpleXMLRPCServer(
('localhost', 9000),
logRequests=True,
)
class DirectoryService:
def list(self, dir_name):
return os.listdir(dir_name)
server.register_instance(DirectoryService())
try:
print('Use Control-C to exit')
server.serve_forever()
except KeyboardInterrupt:
print('Exiting')
# xmlrpc_instance_client.py
import xmlrpc.client
proxy = xmlrpc.client.ServerProxy('http://localhost:9000')
print(proxy.list('/tmp'))
# xmlrpc_instance_dotted_names.py
from xmlrpc.server import SimpleXMLRPCServer
import os
import inspect
server = SimpleXMLRPCServer(
('localhost', 9000),
logRequests=True,
)
class ServiceRoot:
pass
class DirectoryService:
def list(self, dir_name):
return os.listdir(dir_name)
root = ServiceRoot()
root.dir = DirectoryService()
server.register_instance(root, allow_dotted_names=True)
try:
print('Use Control-C to exit')
server.serve_forever()
except KeyboardInterrupt:
print('Exiting')
# xmlrpc_instance_dotted_names_client.py
import xmlrpc.client
proxy = xmlrpc.client.ServerProxy('http://localhost:9000')
print(proxy.dir.list('/tmp'))
Dispatching Calls
# xmlrpc_instance_with_prefix.py
from xmlrpc.server import SimpleXMLRPCServer
import os
import inspect
server = SimpleXMLRPCServer(
('localhost', 9000),
logRequests=True,
)
def expose(f):
"Decorator to set exposed flag on a function."
f.exposed = True
return f
def is_exposed(f):
"Test whether another function should be publicly exposed."
return getattr(f, 'exposed', False)
class MyService:
PREFIX = 'prefix'
def _dispatch(self, method, params):
# Remove our prefix from the method name
if not method.startswith(self.PREFIX + '.'):
raise Exception(
'method "{}" is not supported'.format(method)
)
method_name = method.partition('.')[2]
func = getattr(self, method_name)
if not is_exposed(func):
raise Exception(
'method "{}" is not supported'.format(method)
)
return func(*params)
@expose
def public(self):
return 'This is public'
def private(self):
return 'This is private'
server.register_instance(MyService())
try:
print('Use Control-C to exit')
server.serve_forever()
except KeyboardInterrupt:
print('Exiting')
# xmlrpc_instance_with_prefix_client.py
import xmlrpc.client
proxy = xmlrpc.client.ServerProxy('http://localhost:9000')
print('public():', proxy.prefix.public())
try:
print('private():', proxy.prefix.private())
except Exception as err:
print('\nERROR:', err)
try:
print('public() without prefix:', proxy.public())
except Exception as err:
print('\nERROR:', err)
Introspection API
# xmlrpc_introspection.py
from xmlrpc.server import (SimpleXMLRPCServer,
list_public_methods)
import os
import inspect
server = SimpleXMLRPCServer(
('localhost', 9000),
logRequests=True,
)
server.register_introspection_functions()
class DirectoryService:
def _listMethods(self):
return list_public_methods(self)
def _methodHelp(self, method):
f = getattr(self, method)
return inspect.getdoc(f)
def list(self, dir_name):
"""list(dir_name) => [<filenames>]
Returns a list containing the contents of
the named directory.
"""
return os.listdir(dir_name)
server.register_instance(DirectoryService())
try:
print('Use Control-C to exit')
server.serve_forever()
except KeyboardInterrupt:
print('Exiting')
# xmlrpc_introspection_client.py
import xmlrpc.client
proxy = xmlrpc.client.ServerProxy('http://localhost:9000')
for method_name in proxy.system.listMethods():
print('=' * 60)
print(method_name)
print('-' * 60)
print(proxy.system.methodHelp(method_name))
print()