当前位置: 首页 > 工具软件 > Nameko > 使用案例 >

Nameko入门——官方文档学习记录

陆和泰
2023-12-01

  • 开始编辑时间:2022年5月22日
  • 完成撰写时间:2022年5月23日

Nameko入门

1. Nameko介绍

A microservices framework for Python that lets service developers concentrate on application logic and encourages testability.

一种python的微服务架构,让服务开发者专注于应用逻辑,并鼓励可测试性。

2. 开发环境

  • Ubuntu 18.04 LTS
  • python 3.6.9

3. Nameko安装

  1. 安装rabbitmq(因Nameko使用了RabiitMQ内建的AMQP RPC特性)

     sudo apt-get install rabbitmq-server
    
  2. 安装nameko

     pip install nameko
    

4. Nameko手册

4.1. Nameko特性

内建特性:

  • RPC over AMQP(远程程序调用)
  • 异步事件(pub-sub)
  • 简单的HTTP GET和POST
  • Websocket RPC和订阅(实验性的)

可以直接响应RPC信息,在特定操作时发送事件和侦听其它服务的事件。同时为支持AMQP的客户端提供HTTP接口,为Javascript客户端提供websocket接口。

Nameko鼓励”依赖关系注入模式”,这使编译和测试服务变得简洁。

4.2. 关键概念

4.2.1. 服务解析

一个Nameko服务就是一个python类。该类将应用逻辑压缩在类方法(Method)中,并将所有以来声明为属性。

方法通过特殊的入口装饰暴露给外部。如下代码所示(@rpc)

from nameko.rpc import rpc, RpcProxy

class Service:
    name = "service"

    # we depend on the RPC interface of "another_service"
    other_rpc = RpcProxy("another_service")

    @rpc  # `method` is exposed over RPC
    def method(self):
        # application logic goes here
        pass
  • Entrypoints 入口

入口是它们装饰的服务方法的网关,通常监控外部的实体,如消息队列。在相关事件上,入口可能“触发”,其所装饰的方法将在工作线程上执行。

  • 依赖

依赖关系是隐藏不属于核心服务逻辑的代码的机会。将其它服务作为依赖项实现,声明一个依赖,将其作为服务代码与其它所有内容之间的网关。

  • 工作线程

当一个入口触发时,工作线程将创建。一个工作线程就是服务类的一个实例,但该实例依赖项的声明替换为这些依赖项的实例。

4.2.2. 依赖注入

向一个服务添加依赖是声明式的。类的属性是一个声明,而不是工作线程能够实际上使用的接口。

类的属性式一个DependencyProvider. 依赖关系提供一个*get_denpendency()*方法,将其结果注入到新创建的工作线程中。

工作线程的生命周期:

  1. 入口触发
  2. 工作线程实例化
  3. 依赖注入到工作线程
  4. 方法执行
  5. 工作线程销毁
worker = Service()
worker.other_rpc = worker.other_rpc.get_dependency()
worker.method()
del worker

4.2.3. 并发性

Nameko 构建在 eventlet 库之上,该库通过“greenthreads”提供并发。并发模型是具有隐式 yield 的协同例程。每个工作线程都在自己的绿色线程中执行。可以根据每个工作线程等待 I/O 所花费的时间来调整并发工作线程的最大数量。

许多使用套接字且通常被视为线程安全的 C 扩展可能不适用于“greenthreads”。其中包括librabbitmq,MySQLdb等。

4.2.4. 扩展

所有的入口和依赖提供都将作为扩展生效。这是因为入口和依赖都在服务代码以以外,而且髌骨吧是所有的服务都需要它们。

4.2.5. 运行服务

运行服务所需的只是服务类和相关的配置。运行一个或多个服务最简单的方式是使用Nameko CLI:

nameko run module:[ServiceClass]

这个命令将会在module中查找ServiceClass并执行。

  • 服务容器

每一个服务类都被委托给一个ServiceContainer.容器封装服务所需的所有设计功能,并且包含服务类上的所有扩展,实例如下:

from nameko.containers import ServiceContainer

class Service:
    name = "service"

# create a container
container = ServiceContainer(Service, config={})

# ``container.extensions`` exposes all extensions used by the service
service_extensions = list(container.extensions)

# start service
container.start()

# stop service
container.stop()

  • 服务执行器 Sevice Runner

ServiceRunner是包含多个容器的封装,对外暴露同时执行和关闭所封装容器的方法。这也是nameko run内部使用的方法。也可以单独执行ServiceRunner中的某一个容器。实例如下:

from nameko.runners import ServiceRunner
from nameko.testing.utils import get_container

class ServiceA:
    name = "service_a"

class ServiceB:
    name = "service_b"

# create a runner for ServiceA and ServiceB
runner = ServiceRunner(config={})
runner.add_service(ServiceA)
runner.add_service(ServiceB)

# ``get_container`` will return the container for a particular service
container_a = get_container(runner, ServiceA)

# start both services
runner.start()

# stop both services
runner.stop()

4.3. 命令行接口

4.3.1. 运行一个服务

nameko run <module>[:<ServiceClass>]

搜索并执行一个服务类。这将在前台运行这个服务,知道进程终止。

默认配置参数可通过*–config*进行转换

nameko run --config ./footbar.yaml <modul>[:<ServiceClass>]

一个YMAL配置文件示例如下:

AMQP_URI: 'pyamqp://guest:guest@localhost'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10

LOGGING:
    version: 1
    handlers:
        console:
            class: logging.StreamHandler
    root:
        level: DEBUG
        handlers: [console]

配置的参数值可通过内置的Config接口获取。

4.3.2. 环境变量替换

  • YAML配置文件支持了环境变量。可使用Base风格的格式使用:${ENV_VAR}
  • 使用时可提供默认值:${ENV_VAR:default_value}
  • 并提供递归支持: E N V V A R : d e f a u l t v a l u e : {ENV_VAR:default_value}: ENVVAR:defaultvalue:{OTHER_ENV_VAR:value},实现多层默认值。
# foobar.yaml
AMQP_URI: pyamqp://${RABBITMQ_USER:guest}:${RABBITMQ_PASSWORD:password}@${RABBITMQ_HOST:localhost}
  • 运行服务的同时,配置环境变量
RABBITMQ_USER=user RABBITMQ_PASSWORD=password RABBITMQ_HOST=host nameko run --config ./foobar.yaml <module>[:<ServiceClass>]
  • 如果需要在yaml文件中使用引号,需要添加*!env_var*解析器。
# foobar.yaml
AMQP_URI: !env_var "pyamqp://${RABBITMQ_USER:guest}:${RABBITMQ_PASSWORD:password}@${RABBITMQ_HOST:localhost}"
  • 如果使用原始字符串,需要添加*!raw_env_var*
  • 命令行中初始化环境参数:
# foobar.yaml
...
THINGS: ${A_LIST_OF_THINGS}
A_LIST_OF_THINGS=[A,B,C] nameko run --config ./foobar.yaml <module>[:<ServiceClass>]
# foobar.yaml
LANDING_URL_TEMPLATE: ${LANDING_URL_TEMPLATE:https://example.com/{path}}

4.3.3. 与运行服务相互作用

 nameko shell

启动一个与远程nameko服务相互作用的python shell.先将一个模块n添加到内置的命名空间;nameko shell提供RPC呼叫和事件调度的功能。

  • 对目标服务模块n进行RPC调用:
$ nameko shell
>>> n.rpc.target_service.target_method(...)
# RPC response
  • 事件调度(从源服务发起事件?)
$ nameko shell
>>> n.dispatch_event("source_service", "event_type", "event_payload")

4.4. 内置扩展

4.4.1. RPC

Nameko提供基于AMQO实现的RPC扩展,其由@rpc入口组成,这是一种与其它服务交互的服务代理。非Nameko客户端的可用于对集群进行RPC调用的独立代理。

from nameko.rpc import rpc, RpcProxy

class ServiceY:
    name = "service_y"

    @rpc
    def append_identifier(self, value):
        //对外暴露的接口代理
        return u"{}-y".format(value)


class ServiceX:
    name = "service_x"

    y = RpcProxy("service_y") // 通过name获取服务代理

    @rpc
    def remote_method(self, value):
        res = u"{}-x".format(value)
        return self.y.append_identifier(res)
from nameko.standalone.rpc import ClusterRpcProxy

config = {
    'AMQP_URI': AMQP_URI  # e.g. "pyamqp://guest:guest@localhost"
}

with ClusterRpcProxy(config) as cluster_rpc:
    cluster_rpc.service_x.remote_method("hellø")  # "hellø-x-y" 通过cluster获取某一个服务的接口,并进行远程调用

一般的RPC回调都在远程方法完成后执行,但代理也有后台异步调用模式或者并行RPC调用:

with ClusterRpcProxy(config) as cluster_rpc:
    hello_res = cluster_rpc.service_x.remote_method.call_async("hello")
    world_res = cluster_rpc.service_x.remote_method.call_async("world")
    # do work while waiting  异步调用期间可以执行其它任务
    hello_res.result()  # "hello-x-y"
    world_res.result()  # "world-x-y"

在目标服务具有一个以上实例的集群中,RPC请求在实例之间进行轮询。请求最终将会被目标服务的一个实例处理。

AMQP消息仅会在请求被成功处理后发送确认消息。如果服务无法确认消息并且AMQP连接已关闭,则代理将撤销消息,然后消息分配给可用的服务实例。

请求和回应有效负载被序列化到JSON格式传输。

4.4.2. 事件(Pub-Sub, 发布-订阅)

Nameko 事件是一个异步消息系统,实现了发布和订阅模式。服务调度(发送)由一个或多个服务接收的事件:

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc

class ServiceA:
    """ Event dispatching service. """
    name = "service_a"

    dispatch = EventDispatcher()  //事件分发实例

    @rpc
    def dispatching_method(self, payload):
        // 发布可远程调用
        self.dispatch("event_type", payload)  // 发布 {event_type:payload}


class ServiceB:
    """ Event listening service. """
    name = "service_b"

    @event_handler("service_a", "event_type")
    def handle_event(self, payload):
        print("service b received:", payload)

Eventhandler入口函数具有三个handler_type,决定事件消息将如何从集群中接收。

  • SERVIC_POOL:事件处理程序按服务名称进行池化,每个池中有一个实例接收事件,类似于RPC入口电的群集行为。这是默认的类型。
  • BROADCAST:所有监听服务实例将会收到这个事件;
  • SINGLETON:仅有一个监听服务实例能够接收这个事件。

一个Broadcast实例:(需要在入口函数中指明:handler_type=BROADCAST)

from nameko.events import BROADCAST, event_handler

class ListenerService:
    name = "listener"

    @event_handler(
        "monitor", "ping", handler_type=BROADCAST, reliable_delivery=False
    )
    def ping(self, payload):
        # all running services will respond
        print("pong from {}".format(self.name))

4.4.3. HTTP

HTTP入口函数建立在werkzeug上,支持所有的标准HTTP方法(GET,POST,DELET,PUT等)。可通过逗号分割同时支持多种方法:

默认端口号为:8000

# http.py

import json
from nameko.web.handlers import http

class HttpService:
    name = "http_service"

    @http('GET', '/get/<int:value>')
    def get_method(self, request, value):
        return json.dumps({'value': value})

    @http('POST', '/post')
    def do_post(self, request):
        return u"received: {}".format(request.get_data(as_text=True))

    @http('GET,PUT,POST,DELETE', '/multi')
    def do_multi(self, request):
        return request.method

官方不建议使用namekohttp功能,可直接使用Django。利用库django_nameko中的get_pool直接在Django接口中调用Nameko的微服务。

4.4.4. 定时器

定时器入口能够根据配置的秒数进行不断地触发,是服务实例内进行触发。

from nameko.timer import timer

class Service:
    name ="service"

    @timer(interval=1)
    def ping(self):
        # 服务将会每一秒打印一个 "pong"
        # method executed every second
        print("pong")

4.5. 内建依赖提供

Nameko通过“依赖提供 dependency providers 能够导入配置。

4.5.1. 配置

Config是一个简单的依赖提供器,提供给服务在运行时仅读配置变量的权限。

from nameko.dependency_providers import Config
from nameko.web.handlers import http


class Service:

    name = "test_config"

    config = Config() // 实例化配置

    @property
    def foo_enabled(self):
        return self.config.get('FOO_FEATURE_ENABLED', False) //通过get获取配置参数值

    @http('GET', '/foo')
    def foo(self, request):
        if not self.foo_enabled:
            return 403, "FeatureNotEnabled"

        return 'foo'

4.6. 社区

在核心项目以外,社区提供了大量的nameko扩展和增强库,以方便开发者使用。

4.6.1. 扩展

  1. nameko-sqlalchemy:提供基于SQLAlchemy读写数据库的依赖提供器。这需要一个纯python或者事件集兼容的数据库驱动(pyodbc+odbc driver?)
  2. nameko-sentry:捕获入口函数异常,并将错误信息发送给Sentry服务。
  3. nameko-amqp-retry:允许AMQP入口函数稍后重试
  4. nameko-bayeux-client:带有实现 Bayeux 协议的 Cometd 客户端
  5. nameko-slack: Slack API 扩展。
  6. nameko-enventlog-dispatcher:使用事件方式调度日志数据。
  7. nameko-redis-py:Nameko中Redis依赖和工具
  8. nameko-redis:nameko服务中的redis依赖。
  9. nameko-statsd:统计服务
  10. nameko-twilio:发送短信服务(国外的PaaS)
  11. nameko-snedgrid: 发送邮件
  12. nameko-cachetools:缓存nameko服务间的交互

4.6.2. 增强库

  1. django-nameko: Nameko微服务的Django包装器
  2. flask_nameko: Nameko微服务的Flask包装器
  3. nameko-proxy: Nameko微服务简单异步代理。

4.7. 测试服务

4.7.1. 哲学理念

Nameko的惯例就是将测试设计的尽可能简单。微服务是小且目的单一的;依赖注入使设计功能的替换和隔离变得简单。

4.7.2. 单元测试

Nameko中的单元测试通常指对一个独立的服务进行独立测试,不考虑任何其它依赖。

wroker_factory()根据所给的服务创建一个工程线程,服务的依赖将被mock.MagicMock对象替换。可通过side_effectreturn_value模仿依赖功能。

""" Service unit testing best practice.
"""
from nameko.rpc import RpcProxy, rpc
from nameko.testing.services import worker_factory

class ConversionService(object):
    """ Service under test 用于测试的服务
    """
    name = "conversions"

    maths_rpc = RpcProxy("maths")

    @rpc
    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    @rpc
    def cms_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)


def test_conversion_service():
    # create worker with mock dependencies
    # 创建模拟以来的工作线程
    service = worker_factory(ConversionService)

    # add side effects to the mock proxy to the "maths" service
    # maths是微服务的一个依赖,通过add side effect模拟代理这个微服务。
    service.maths_rpc.multiply.side_effect = lambda x, y: x * y
    service.maths_rpc.divide.side_effect = lambda x, y: x / y

    # test inches_to_cm business logic
    # 测试 微服务的某个接口的逻辑,这个接口中的其它服务依赖,已由add side effect模拟代理。
    # 使用assert进行断言。
    assert service.inches_to_cm(300) == 762
    service.maths_rpc.multiply.assert_called_once_with(300, 2.54)

    # test cms_to_inches business logic
    assert service.cms_to_inches(762) == 300
    service.maths_rpc.divide.assert_called_once_with(762, 2.54)

有些时候,提供可选择的依赖而不是模拟更合适,例如全流程测试,如数据连接。

""" Service unit testing best practice, with an alternative dependency.
"""

import pytest
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from nameko.rpc import rpc
from nameko.testing.services import worker_factory

# using community extension from http://pypi.python.org/pypi/nameko-sqlalchemy
from nameko_sqlalchemy import Session

Base = declarative_base()


class Result(Base):
    __tablename__ = 'model'
    id = Column(Integer, primary_key=True)
    value = Column(String(64))

class Service:
    """ Service under test
    """
    name = "service"

    db = Session(Base)

    @rpc
    def save(self, value):
        result = Result(value=value)
        self.db.add(result)
        self.db.commit()

@pytest.fixture
def session():
    """ Create a test database and session
    """
    engine = create_engine('sqlite:///:memory:')
    Base.metadata.create_all(engine)
    session_cls = sessionmaker(bind=engine)
    return session_cls()

def test_service(session):

    # create instance, providing the test database session
    service = worker_factory(Service, db=session)

    # verify ``save`` logic by querying the test database
    service.save("helloworld")
    assert session.query(Result.value).all() == [("helloworld",)]

4.7.3. 一体化测试

Nameko中的一体化测试就是测试服务间的接口。推荐的方式是:按照正常方式执行所有的测试服务,然后使用helper触发一个入口函数.

""" Service integration testing best practice.
"""

from nameko.rpc import rpc, RpcProxy
from nameko.testing.utils import get_container
from nameko.testing.services import entrypoint_hook


class ServiceX:
    """ Service under test
    """
    name = "service_x"

    y = RpcProxy("service_y") // 依赖

    @rpc
    def remote_method(self, value):
        res = "{}-x".format(value)
        return self.y.append_identifier(res)


class ServiceY:
    """ Service under test
    """
    name = "service_y"

    @rpc
    def append_identifier(self, value):
        return "{}-y".format(value)


def test_service_x_y_integration(runner_factory, rabbit_config):

    # run services in the normal manner
    # 按照常规方式启动服务
    runner = runner_factory(rabbit_config, ServiceX, ServiceY)
    runner.start()

    # artificially fire the "remote_method" entrypoint on ServiceX
    # 人为触发ServiceX的remote_method入口函数,然后验证返回
    # and verify response
    container = get_container(runner, ServiceX)
    with entrypoint_hook(container, "remote_method") as entrypoint:
        assert entrypoint("value") == "value-x-y"

4.7.4. 其它测试辅助接口

  1. restrict_entrypoints: 停用容器中的入口函数。需要在容器启动以前执行。
  2. replace_dependencies: 替换容器中的依赖提供为一个MockDependencyProvider。替换是在容器上进行的,对服务类没有影响。
  3. entrypoint_hook: 允许服务的入口函数手动调用。
  4. entrypoint_waiter: 上下文管理器,再异步服务触发,未完成之前不会退出。

Nameko的测试部分nameko.testing.pytest是基于pytest实现的,如果了解pytest,将能够进行一些有用的配置。

4.8. 撰写Nameko扩展

4.8.1. 概述

Nameko的扩展组件需要继承于nameko.extensions.Extension。这部分内容较为深,不放在此处讨论。

 类似资料: