当前位置: 首页 > 工具软件 > Nameko > 使用案例 >

nameko使用记录

宦子琪
2023-12-01

微服务

微服务架构,一种软件应用设计方式,其使得应用可以由多个独立部署的服务以服务套件的形式组成。

不同任务的功能块,只提供用于通讯的通用指向(通常是已经将通讯协议和接口定义好的消息队列)。

现如今,将项目拆分成多个独立的、可扩展的服务是保障代码演变的最好选择。

Nameko

nameko是什么?

Nameko是一个用python语言写的微服务框架,

支持通过 rabbitmq 消息队列传递的 rpc 调用,也支持 http 调用。

小巧简洁,简单且强大;

可以让你专注于应用逻辑

nameko能干啥?

可通过RabbitMq消息组件来实现RPC服务

nameko怎么用?

# 安装 nameko 库
pip install nameko
# 安装 rabbitmq: 推荐使用 docker 
docker pull rabbitmq:management
docker run --hostname my-rabbit --name rabbitmq-borg -p 15672:15672 -p 25672:25672 -p 5672:5672 -d rabbitmq:management
提示:rabbitmq docker 管理页面为 localhost:15672,默认用户名密码 guest、 guest。

[nah-meh-koh]

A microservices framework for Python that lets service developers concentrate on application logic and encourages testability.

A nameko service is just a class:

# helloworld.py

from nameko.rpc import rpc

class GreetingService:
    name = "greeting_service"

    @rpc
    def hello(self, name):
        return "Hello, {}!".format(name)

You can run it in a shell:

$ nameko run helloworld
starting services: greeting_service
...

And play with it from another:

$ nameko shell
>>> n.rpc.greeting_service.hello(name="ナメコ")
'Hello, ナメコ!'

示例1

参考文档:https://zhuanlan.zhihu.com/p/95229201?from_voters_page=true

配置项 config.yml

AMQP_URI: 'pyamqp://guest:guest@127.0.0.1'
WEB_SERVER_ADDRESS: '0.0.0.0:8888'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10

LOGGING:
    version: 1
    handlers:
        console:
            class: logging.StreamHandler
    root:
        level: DEBUG
        handlers: [console]

服务端 server.py

# server.py
# 服务端
from nameko.rpc import rpc

class GreetServer(object):
    name = "greet_server"

    @rpc
    def hello(self, name):
        return name
    # 使用@rpc 装饰器定义RPC服务
# 运行server服务 
nameko run --config config.yml server 
nameko run --config config.yml server:GreetingService
输出:
starting <QueueConsumer at 0x7f759cbb3da0>
waiting for consumer ready <QueueConsumer at 0x7f759cbb3da0>
Connected to amqp://guest:**@127.0.0.1:5672//
setting up consumers <QueueConsumer at 0x7f759cbb3da0>
consumer started <QueueConsumer at 0x7f759cbb3da0>
started <QueueConsumer at 0x7f759cbb3da0>

服务端 server_http.py

# http_server.py
# 封装成http
import json
from nameko.web.handlers import http

class HttpServer:
    name = "http_server"

    @http("GET", "/get/<int:value>")
    def get_method(self, request, value):
        obj = {'value': value}
        return json.dumps(obj)

    @http('POST', '/post')
    def post_method(self, request):
        data = request.get_data(as_text=True)
        return u"received: {}".format(data)
# 运行http_server服务
nameko run --config config.yml http_server 

测试

cProfile测试
# client.py
# 客户端
import cProfile
from nameko.standalone.rpc import ClusterRpcProxy
# 使用cProfile测试性能

config = {
    'AMQP_URI': "pyamqp://guest:guest@localhost"
}

def test():
    with ClusterRpcProxy(config) as cluster_rpc:
        rs = cluster_rpc.greet_server.hello("hellø")

if __name__ == '__main__':
    cProfile.run("test()")
python client.py
	5914 function calls (5783 primitive calls) in 0.076 seconds
平均1次调用RPC开销在76毫秒,即20
wrt测试http
# 使用wrt测试
wrk -t1 -d20s -c100 http://127.0.0.1:8000/get/42
说明:1个线程,持续20秒,100个连接
# 测试结果
Running 20s test @ http://127.0.0.1:8000/get/42
  1 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    74.17ms   11.06ms 141.23ms   63.00%
    Req/Sec     1.35k   204.27     1.72k    62.50%
  26934 requests in 20.03s, 3.34MB read
Requests/sec:   1344.62
Transfer/sec:    170.70KB
说明:响应时间和直接RPC请求差不多 
QPS并发量1344/sec,只能说一般般吧 
QPS不等于并发连接数,QPS是每秒HTTP请求数量,并发连接数是系统同时处理的请求数量;

示例2

参考文档:https://blog.csdn.net/huobanjishijian/article/details/80346212

# 安装 nameko 框架和 yagmail
pip install nameko
pip install yagmail
# 安装 Flask 和 Flasgger
pip install flask
pip install flasgger
# 说明
flasgger: A swagger API: 虚拟API
Swagger是一款Restful接口的文档在线自动生成+功能测试功能软件;通过swagger能够清晰、便捷地调试符合Restful规范的API;
Flask是一款Python实现的Web开发微框架;在flask框架中使用的swagger即为flasgger,flasgger是flask支持的swagger UI,便于调试使用flask框架搭建的web api接口;

# 编码实现 nameko 的 RPC 服务

# service.py
# 服务

import yagmail
from nameko.rpc import rpc, RpcProxy


class Mail(object):
    name = 'mail'

    @rpc
    def send(self, to, subject, contents):
        yag = yagmail.SMTP(user='myname@gmail', password='mypassword')
        yag.send(
            to=to.encode('utf-8'), subject=subject.encode('utf-8'),
            contents=[contents.encode('utf-8')]
        )
        # 贴士: 参照 Dynaconf 设置模块


class Compute(object):
    name = 'compute'
    mail = RpcProxy('mail')

    @rpc
    def compute(self, operation, value, other, email):
        operations = {
            'sum': lambda x, y: int(x) + int(y),
            'mul': lambda x, y: int(x) * int(y),
            'div': lambda x, y: int(x) / int(y),
            'sub': lambda x, y: int(x) - int(y)
        }
        try:
            result = operations[operation](value, other)
        except Exception as e:
            self.mail.send.async(email, 'An error occurred', str(e))
            raise
        else:
            self.mail.send.async(
                email, "Your operation is complete!", "The result is: %s" % result
            )
            return result

运行Nameko RPC 服务

# 在 Shell 中启动并运行服务
nameko run <module_name> 
nameko run server  # module
nameko run server:Compute  # module[:service class]
nameko run server:Compute server:Mail 
nameko run server --broker amqp://guest:guest@localhost
# 在 控制台 输出
starting services: compute, mail
Connected to amqp://guest:**@127.0.0.1:5672//
Connected to amqp://guest:**@127.0.0.1:5672//
^C
stopping services: compute, mail
# 提示
在生产环境中,建议使用 supervisord 替代控制台命令。

# 用 nameko shell 进行测试

# 进入 RPC 客户端, 通过 n.rpc 对象来进行测试
nameko shell --broker amqp://guest:guest@localhost
>>> n.rpc.mail.send("18832023602@163.com", "testing", "Just testing")
>>> n.rpc.compute.compute('sum', 30, 10, "18832023602@163.com")
>>> n.rpc.compute.compute('sub', 30, 10, "18832023602@163.com")
>>> n.rpc.compute.compute('mul', 30, 10, "18832023602@163.com")
>>> n.rpc.compute.compute('div', 30, 10, "18832023602@163.com")

ClusterProxy ServiceProxy MethodProxy

>>> n
<module 'nameko'>
>>> n.rpc
<nameko.standalone.rpc.ClusterProxy object at 0x7fdef64b66d8>
>>> n.rpc.Compute
<nameko.rpc.ServiceProxy object at 0x7fdef460feb8>
>>> n.rpc.compute
<nameko.rpc.ServiceProxy object at 0x7fdef460fe48>
>>> n.rpc.Mail
<nameko.rpc.ServiceProxy object at 0x7fdef460ff28>
>>> n.rpc.mail
<nameko.rpc.ServiceProxy object at 0x7fdef460fe10>

# 在 Flask API 中调用微服务

# app.py
# Flask API

from flask import Flask, request
from flasgger import Swagger
from nameko.standalone.rpc import ClusterRpcProxy

app = Flask(__name__)
Swagger(app)
CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}

@app.route('/compute', methods=['POST'])
def compute():
    """
    Micro Service Based Compute and Mail API
    This API is made with Flask, Flasgger and Nameko
    ---
    parameters:
      - name: body
        in: body
        required: true
        schema:
          id: data
          properties:
            operation:
              type: string
              enum:
                - sum
                - mul
                - sub
                - div
            email:
              type: string
            value:
              type: integer
            other:
              type: integer
    responses:
      200:
        description: Please wait the calculation, you'll receive an email with results
    """
    operation = request.json.get('operation')
    value = request.json.get('value')
    other = request.json.get('other')
    email = request.json.get('email')
    msg = "Please wait the calculation, you'll receive an email with results"
    subject = "API Notification"
    with ClusterRpcProxy(CONFIG) as rpc:
        # asynchronously spawning and email notification
        rpc.mail.send.async(email, subject, msg)
        # asynchronously spawning the compute task
        result = rpc.compute.compute.async(operation, value, other, email)
        return msg, 200

if __name__ == '__main__':
    app.run(debug=True)
# 运行 flask 服务
python api.py 
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
# 访问 flasgger swagger
http://localhost:5000/apidocs/
http://localhost:5000/apidocs/index.html,
查看 Flasgger 的界面,利用它可以进行 API 的交互并可以发布任务到队列以供服务进行消费。
可以在 shell 中查看到服务的运行日志,打印信息和错误信息。也可以访问 RabbitMQ 控制面板来查看消息在队列中的处理情况。

示例3

参考文档:https://zhuanlan.zhihu.com/p/82924225

使用帮助

nameko run --help

nameko run --help
usage: nameko run [-h] [--config CONFIG] [--broker BROKER]
                  [--backdoor-port BACKDOOR_PORT]
                  module[:service class] [module[:service class] ...]

Run nameko services. Given a python path to a module containing one or more
nameko services, will host and run them. By default this will try to find
classes that look like services (anything with nameko entrypoints), but a
specific service can be specified via ``nameko run module:ServiceClass``.

positional arguments:
  module[:service class]
                        python path to one or more service classes to run

optional arguments:
  -h, --help            show this help message and exit
  --config CONFIG       The YAML configuration file
  --broker BROKER       RabbitMQ broker url
  --backdoor-port BACKDOOR_PORT
                        Specify a port number to host a backdoor, which can be
                        connected to for an interactive interpreter within the
                        running service process using `nameko backdoor`.

 类似资料: