微服务架构,一种软件应用设计方式,其使得应用可以由多个独立部署的服务以服务套件的形式组成。
不同任务的功能块,只提供用于通讯的通用指向(通常是已经将通讯协议和接口定义好的消息队列)。
现如今,将项目拆分成多个独立的、可扩展的服务是保障代码演变的最好选择。
Nameko是一个用python语言写的微服务框架,
支持通过 rabbitmq 消息队列传递的 rpc 调用,也支持 http 调用。
小巧简洁,简单且强大;
可以让你专注于应用逻辑
可通过RabbitMq消息组件来实现RPC服务
# 安装 nameko 库
pip install nameko
# 安装 rabbitmq: 推荐使用 docker
docker pull rabbitmq:management
docker run --hostname my-rabbit --name rabbitmq-borg -p 15672:15672 -p 25672:25672 -p 5672:5672 -d rabbitmq:management
提示:rabbitmq docker 管理页面为 localhost:15672,默认用户名密码 guest、 guest。
[nah-meh-koh]
A microservices framework for Python that lets service developers concentrate on application logic and encourages testability.
A nameko service is just a class:
# helloworld.py
from nameko.rpc import rpc
class GreetingService:
name = "greeting_service"
@rpc
def hello(self, name):
return "Hello, {}!".format(name)
You can run it in a shell:
$ nameko run helloworld
starting services: greeting_service
...
And play with it from another:
$ nameko shell
>>> n.rpc.greeting_service.hello(name="ナメコ")
'Hello, ナメコ!'
参考文档:https://zhuanlan.zhihu.com/p/95229201?from_voters_page=true
AMQP_URI: 'pyamqp://guest:guest@127.0.0.1'
WEB_SERVER_ADDRESS: '0.0.0.0:8888'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10
LOGGING:
version: 1
handlers:
console:
class: logging.StreamHandler
root:
level: DEBUG
handlers: [console]
# server.py
# 服务端
from nameko.rpc import rpc
class GreetServer(object):
name = "greet_server"
@rpc
def hello(self, name):
return name
# 使用@rpc 装饰器定义RPC服务
# 运行server服务
nameko run --config config.yml server
nameko run --config config.yml server:GreetingService
输出:
starting <QueueConsumer at 0x7f759cbb3da0>
waiting for consumer ready <QueueConsumer at 0x7f759cbb3da0>
Connected to amqp://guest:**@127.0.0.1:5672//
setting up consumers <QueueConsumer at 0x7f759cbb3da0>
consumer started <QueueConsumer at 0x7f759cbb3da0>
started <QueueConsumer at 0x7f759cbb3da0>
# http_server.py
# 封装成http
import json
from nameko.web.handlers import http
class HttpServer:
name = "http_server"
@http("GET", "/get/<int:value>")
def get_method(self, request, value):
obj = {'value': value}
return json.dumps(obj)
@http('POST', '/post')
def post_method(self, request):
data = request.get_data(as_text=True)
return u"received: {}".format(data)
# 运行http_server服务
nameko run --config config.yml http_server
# client.py
# 客户端
import cProfile
from nameko.standalone.rpc import ClusterRpcProxy
# 使用cProfile测试性能
config = {
'AMQP_URI': "pyamqp://guest:guest@localhost"
}
def test():
with ClusterRpcProxy(config) as cluster_rpc:
rs = cluster_rpc.greet_server.hello("hellø")
if __name__ == '__main__':
cProfile.run("test()")
python client.py
5914 function calls (5783 primitive calls) in 0.076 seconds
平均1次调用RPC开销在76毫秒,即20
# 使用wrt测试
wrk -t1 -d20s -c100 http://127.0.0.1:8000/get/42
说明:1个线程,持续20秒,100个连接
# 测试结果
Running 20s test @ http://127.0.0.1:8000/get/42
1 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 74.17ms 11.06ms 141.23ms 63.00%
Req/Sec 1.35k 204.27 1.72k 62.50%
26934 requests in 20.03s, 3.34MB read
Requests/sec: 1344.62
Transfer/sec: 170.70KB
说明:响应时间和直接RPC请求差不多
QPS并发量1344/sec,只能说一般般吧
QPS不等于并发连接数,QPS是每秒HTTP请求数量,并发连接数是系统同时处理的请求数量;
参考文档:https://blog.csdn.net/huobanjishijian/article/details/80346212
# 安装 nameko 框架和 yagmail
pip install nameko
pip install yagmail
# 安装 Flask 和 Flasgger
pip install flask
pip install flasgger
# 说明
flasgger: A swagger API: 虚拟API
Swagger是一款Restful接口的文档在线自动生成+功能测试功能软件;通过swagger能够清晰、便捷地调试符合Restful规范的API;
Flask是一款Python实现的Web开发微框架;在flask框架中使用的swagger即为flasgger,flasgger是flask支持的swagger UI,便于调试使用flask框架搭建的web api接口;
# service.py
# 服务
import yagmail
from nameko.rpc import rpc, RpcProxy
class Mail(object):
name = 'mail'
@rpc
def send(self, to, subject, contents):
yag = yagmail.SMTP(user='myname@gmail', password='mypassword')
yag.send(
to=to.encode('utf-8'), subject=subject.encode('utf-8'),
contents=[contents.encode('utf-8')]
)
# 贴士: 参照 Dynaconf 设置模块
class Compute(object):
name = 'compute'
mail = RpcProxy('mail')
@rpc
def compute(self, operation, value, other, email):
operations = {
'sum': lambda x, y: int(x) + int(y),
'mul': lambda x, y: int(x) * int(y),
'div': lambda x, y: int(x) / int(y),
'sub': lambda x, y: int(x) - int(y)
}
try:
result = operations[operation](value, other)
except Exception as e:
self.mail.send.async(email, 'An error occurred', str(e))
raise
else:
self.mail.send.async(
email, "Your operation is complete!", "The result is: %s" % result
)
return result
# 在 Shell 中启动并运行服务
nameko run <module_name>
nameko run server # module
nameko run server:Compute # module[:service class]
nameko run server:Compute server:Mail
nameko run server --broker amqp://guest:guest@localhost
# 在 控制台 输出
starting services: compute, mail
Connected to amqp://guest:**@127.0.0.1:5672//
Connected to amqp://guest:**@127.0.0.1:5672//
^C
stopping services: compute, mail
# 提示
在生产环境中,建议使用 supervisord 替代控制台命令。
# 进入 RPC 客户端, 通过 n.rpc 对象来进行测试
nameko shell --broker amqp://guest:guest@localhost
>>> n.rpc.mail.send("18832023602@163.com", "testing", "Just testing")
>>> n.rpc.compute.compute('sum', 30, 10, "18832023602@163.com")
>>> n.rpc.compute.compute('sub', 30, 10, "18832023602@163.com")
>>> n.rpc.compute.compute('mul', 30, 10, "18832023602@163.com")
>>> n.rpc.compute.compute('div', 30, 10, "18832023602@163.com")
ClusterProxy ServiceProxy MethodProxy
>>> n
<module 'nameko'>
>>> n.rpc
<nameko.standalone.rpc.ClusterProxy object at 0x7fdef64b66d8>
>>> n.rpc.Compute
<nameko.rpc.ServiceProxy object at 0x7fdef460feb8>
>>> n.rpc.compute
<nameko.rpc.ServiceProxy object at 0x7fdef460fe48>
>>> n.rpc.Mail
<nameko.rpc.ServiceProxy object at 0x7fdef460ff28>
>>> n.rpc.mail
<nameko.rpc.ServiceProxy object at 0x7fdef460fe10>
# app.py
# Flask API
from flask import Flask, request
from flasgger import Swagger
from nameko.standalone.rpc import ClusterRpcProxy
app = Flask(__name__)
Swagger(app)
CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}
@app.route('/compute', methods=['POST'])
def compute():
"""
Micro Service Based Compute and Mail API
This API is made with Flask, Flasgger and Nameko
---
parameters:
- name: body
in: body
required: true
schema:
id: data
properties:
operation:
type: string
enum:
- sum
- mul
- sub
- div
email:
type: string
value:
type: integer
other:
type: integer
responses:
200:
description: Please wait the calculation, you'll receive an email with results
"""
operation = request.json.get('operation')
value = request.json.get('value')
other = request.json.get('other')
email = request.json.get('email')
msg = "Please wait the calculation, you'll receive an email with results"
subject = "API Notification"
with ClusterRpcProxy(CONFIG) as rpc:
# asynchronously spawning and email notification
rpc.mail.send.async(email, subject, msg)
# asynchronously spawning the compute task
result = rpc.compute.compute.async(operation, value, other, email)
return msg, 200
if __name__ == '__main__':
app.run(debug=True)
# 运行 flask 服务
python api.py
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
# 访问 flasgger swagger
http://localhost:5000/apidocs/
http://localhost:5000/apidocs/index.html,
查看 Flasgger 的界面,利用它可以进行 API 的交互并可以发布任务到队列以供服务进行消费。
可以在 shell 中查看到服务的运行日志,打印信息和错误信息。也可以访问 RabbitMQ 控制面板来查看消息在队列中的处理情况。
参考文档:https://zhuanlan.zhihu.com/p/82924225
nameko run --help
usage: nameko run [-h] [--config CONFIG] [--broker BROKER]
[--backdoor-port BACKDOOR_PORT]
module[:service class] [module[:service class] ...]
Run nameko services. Given a python path to a module containing one or more
nameko services, will host and run them. By default this will try to find
classes that look like services (anything with nameko entrypoints), but a
specific service can be specified via ``nameko run module:ServiceClass``.
positional arguments:
module[:service class]
python path to one or more service classes to run
optional arguments:
-h, --help show this help message and exit
--config CONFIG The YAML configuration file
--broker BROKER RabbitMQ broker url
--backdoor-port BACKDOOR_PORT
Specify a port number to host a backdoor, which can be
connected to for an interactive interpreter within the
running service process using `nameko backdoor`.