from socketserver import ThreadingMixIn
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.server import SimpleXMLRPCRequestHandler
class ThreadXMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):
pass # 多线程
class MultRequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/RPC2', )
class Calculate:
def add(self, x, y):
print('加法')
return x + y
def multiply(self, x, y):
print('乘法')
return x * y
def subtract(self, x, y):
print('减法')
return abs(x - y)
def divide(self, x, y):
print('除法')
return x / y
def startRPCServer():
# 多线程
with ThreadXMLRPCServer(('0.0.0.0', 5000),
requestHandler=MultRequestHandler,
allow_none=True) as mserver:
mserver.register_instance(Calculate())
# 提供 server.system.xx方法 例:server.system.listMethods()
mserver.register_introspection_functions()
#在系统中注册XML-RPC多路调用方法命名空间
mserver.register_multicall_functions()
#可注册单个系统方法或自定义方法
# mserver.register_function(pow)
# @mserver.register_function(name='add')
# def madd(a, b):
# return a + b
mserver.serve_forever()
startRPCServer()
1.安装第三方包
pip install django-json-rpc
2.在django项目settings配置文件中注册 'jsonrpc'
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'corsheaders',
'jsonrpc', #RPC服务
]
3.在视图中调用views.py
from jsonrpc import jsonrpc_method
def mah_authenticate(username, password):
print(1111111111111111111111111111111111111111)
return 'ok'
@
jsonrpc_method(
'applyPaperReport(reportInfo=dict)', #定义方法名与参数
# authentication_arguments=['admin', 'admin123456'], #用户验证
# authenticated=True, #开启用户验证
# authenticated=mah_authenticate, #自定义验证
validate=True, #是否校检参数类型
)
def applyPaperReport(request, reportInfo):
return {'code': 200, 'message': 'message', 'data': 'data'}
注册路由 urls.py
from django.urls import re_path
from jsonrpc import jsonrpc_site
urlpatterns = [
#RPC
re_path(r'callFun', jsonrpc_site.dispatch),
]
django服务rpc互调用
from jsonrpc.proxy import ServiceProxy
url = "http://127.0.0.1:8000/callFun"
s = ServiceProxy(url)
res = s.applyPaperReport.data()
print(res)
返回值
{
'id': '425d9d70-ccbe-11ea-a8ca-7470fd0bd93a',
'error': None,
'result': [
{'id': 1, 'name': 'one', 'age': 22},
{'id': 2, 'name': 'two', 'age': 23},
{'id': 3, 'name': 'three, 'age': 24}
],
'jsonrpc': '2.0'
}
api调用
import requests
url = "http://127.0.0.1:8000/callFun"
post_data = {
"id": "425d9d70-ccbe-11ea-a8ca-7470fd0bd93ad",
"method":"applyPaperReport",
"params":[{
"taskId": "20220414959",
}],
"jsonrpc": "2.0"
}
res = requests.post(url,json=post_data)
print(res.json())
返回值
{
'id': '425d9d70-ccbe-11ea-a8ca-7470fd0bd93a',
'error': None,
'result': [
{'id': 1, 'name': 'one', 'age': 22},
{'id': 2, 'name': 'two', 'age': 23},
{'id': 3, 'name': 'three, 'age': 24}
],
'jsonrpc': '2.0'
}
安装第三方包
pip install Flask-JSONRPC
创建 Flask 应用程序:
from flask import Flask
app = Flask(__name__)
创建 JSON-RPC 视图:
from flask_jsonrpc import JSONRPC
from flask_jsonrpc.validate import ValidationError, validate
jsonrpc = JSONRPC(app, '/api')
@jsonrpc.method('App.add')
@validate('x', int)
@validate('y', int)
def add(x, y):
if x > 10:
raise ValidationError("Text must be no longer than 10 characters.")
return x + y
运行 Flask
if __name__ == '__main__':
app.run()
api调用和django一样