当前位置: 首页 > 工具软件 > Nameko > 使用案例 >

Nameko中文教程

曾河
2023-12-01

1.Nameko是什么

一个让python程序员关注应用逻辑和测试的微服务框架。

安装

安装Nameko

pip instal nameko

安装nameko需要的依赖

sudo apt install rabbitmq-server

入门程序

服务端

# helloworld.py

from nameko.rpc import rpc

class GreetingService:
    name = "greeting_service" # 自定义服务名称

    @rpc #入口点标记
    def hello(self, name):
        return "Hello, {}!".format(name)

运行服务

nameko run helloworld

正常输出

starting services: greeting_service
Connected to amqp://guest:**@127.0.0.1:5672//

命令行客户端

进入nameko交互模式

nameko shell

发起客户端请求

n.rpc.greeting_service.hello(name="hello")

正常输出

u'Hello, hello!'

2.核心概念

服务

一个服务就是一个python类。这个类把服务逻辑封装到方法中,而且把任何的依赖都作为方法的参数。原因很简单,微服务的核心的就是解耦。
进入点
进入点可以简单理解为带有@rpc标记的方法所关联的服务入口。在方法上使用了@rpc修饰的方法都将暴露给外部业务。这些进入点一般会会监视外部事件。例如一个消息队列中的消息事件,将触发进入点修饰的方法执行并返回结果。
依赖
官方提出所有非服务核心逻辑的实现最好都以依赖的形式实现。
依赖其实是隐藏代码的一种很好的方式。
使用依赖时应该把所有依赖都进行声明。
工作器
工作器就是进入点发放被触发的时候产生的微服务类实例。但是如果有依赖,那么就会被依赖的实例代替。
一个工作器实例只处理一次请求,提供的是无状态服务。
一个服务可以同时运行多个工作器,但最多只能是用户预定义的并发值。

依赖注入

服务类的依赖添加是声明式的。声明时不是使用接口,而是通过使用参数进行声明。
这个参数是一个DependencyProvider。这个东西负责提供注入到服务工作器的对象。
所有的provider都要提供get_dependency()方法生产要注入到工作器中的对象。

工作器生命周期:

1.进入点触发
2.通过服务类初始化工作器
3.依赖出入到工作器
4.执行方法
5.工作器销毁

伪代码:

worker = Service()
worker.other_rpc = worker.other_rpc.get_dependency()
worker.method()
del worker

依赖提供者在服务的持续时间内存活,而注入的依赖项对于每个工作器来说都是惟一的。

同步

Nameko基于eventlet库,这个库实现的同步模型是基于隐式yield模式的协程,通过“绿色的线程”提供同步功能。

隐式的yield基于monkey patching基础库。当一个线程等待IO时就会触发yield。通过命令==nameko run==启动的服务将会应用这个模式。

每一个工作器都有自己的线程。最大的同步工作器数量可以基于每个工作器等待IO时间的总量来动态调整。

工作器都是无状态的所以天生线程安全。但是外部依赖应该确保他们每个工作线程都是用同一个依赖或者多个工作器都能安全地同步访问。

然而c扩展体系都使用socket通信,他们通常都不认为绿色线程的工作能满足线程安全。其中就包括 librabbitmq, MySQLdb等。

扩展

所有的入口点和依赖提供者都作为“扩展”实现。因为他们存在于服务代码之外,又不是所有服务都需要的。(例如一个纯的AMQP暴露的服务将不会使用HTTP入口点)

Nameko有大量的内建扩展,一些是有社区提供的,而你也可以实现自己的扩展。

运行服务

运行服务需要的所有东西:服务类和有关的配置。
最简单的运行一个或者多个服务的方法是使用Nameko命令行:

nameko run module:[ServiceClass]

这里的意思是运行某module下的所有服务或者运行某module下的特定的ServiceClass服务。

服务容器

每个服务类都委托给一个ServiceContainer。这个容器封装了所有需要运行一个服务的方法,而且装载了在服务类上的任何扩展。

使用ServiceContainer运行单个服务:

from nameko.containers import ServiceContainer

class Service:
    name = "service"

# create a container
container = ServiceContainer(Service, config={})

# ``container.extensions`` exposes all extensions used by the service
service_extensions = list(container.extensions)

# start service
container.start()

# stop service
container.stop()

服务运行器

ServiceRunner 是多个服务容器的简单包装,同时提供启动和停止所有包装容器的方法。这个其实是nameko run内部使用的,但这也能实现程序化控制。

from nameko.runners import ServiceRunner
from nameko.testing.utils import get_container

class ServiceA:
    name = "service_a"

class ServiceB:
    name = "service_b"

# create a runner for ServiceA and ServiceB
runner = ServiceRunner(config={})
runner.add_service(ServiceA)
runner.add_service(ServiceB)

# ``get_container`` will return the container for a particular service
container_a = get_container(runner, ServiceA)

# start both services
runner.start()

# stop both services
runner.stop()

3.命令行接口

Nameko 提供了一个命令行接口尽可能方便地托管服务及与服务进行交互。

运行服务

nameko run <module>[:<ServiceClass>]

发现并运行服务类。这个命令将会在前台启动服务并且运行到进程终止。

也可以用–config选项重写默认配置,并提供一个YAML 格式的配置文件

nameko run --config ./foobar.yaml <module>[:<ServiceClass>]
# foobar.yaml

AMQP_URI: 'pyamqp://guest:guest@localhost'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10

LOGGING:
    version: 1
    handlers:
        console:
            class: logging.StreamHandler
    root:
        level: DEBUG
        handlers: [console]

LOGGING 项会传递到logging.config.dictConfig(),而且会适应调用的样式。

配置值可以通过内建的Config依赖提供器读取。

环境变量解决方案

YAML配置文件为环境变量提供了基本的支持。可以使用bash风格的语法:${ENV_VAR},另外还可以提供默认值${ENV_VAR:default_value}

# foobar.yaml
AMQP_URI: pyamqp://${RABBITMQ_USER:guest}:${RABBITMQ_PASSWORD:password}@${RABBITMQ_HOST:localhost}

使用环境变量的运行方式示例

$ RABBITMQ_USER=user RABBITMQ_PASSWORD=password RABBITMQ_HOST=host nameko run --config ./foobar.yaml <module>[:<ServiceClass>]

如果需要在YAML文件里使用引号(引号里使用环境变量),显式声明!env_var处理器是必须的。

# foobar.yaml
AMQP_URI: !env_var "pyamqp://${RABBITMQ_USER:guest}:${RABBITMQ_PASSWORD:password}@${RABBITMQ_HOST:localhost}"

与运行的服务进行交互

nameko shell

上述命令是为了与远程服务工作,运行了一个交互式python脚本环境。这是规范的交互式解释器,提供了一个添加到了内建命名空间的特殊的模块n,以支持RPC调用和分发事件。

发起RPC到目标服务
$ nameko shell
>>> n.rpc.target_service.target_method(...)
# RPC response
作为源服务分发事件
$ nameko shell
>>> n.dispatch_event("source_service", "event_type", "event_payload")

4.内建的扩展

RPC

Nameko包含了一个基于AMQP的RPC实现。它包括@rpc入口点,一个与其他服务对话的代理,以及一个非Nameko客户端也能发起RPC调用到集群的独立的代理

from nameko.rpc import rpc, RpcProxy

class ServiceY:
    name = "service_y"

    @rpc
    def append_identifier(self, value):
        return u"{}-y".format(value)


class ServiceX:
    name = "service_x"

    y = RpcProxy("service_y")

    @rpc
    def remote_method(self, value):
        res = u"{}-x".format(value)
        return self.y.append_identifier(res)
from nameko.standalone.rpc import ClusterRpcProxy

config = {
    'AMQP_URI': AMQP_URI  # e.g. "pyamqp://guest:guest@localhost"
}

with ClusterRpcProxy(config) as cluster_rpc:
    cluster_rpc.service_x.remote_method("hellø")  # "hellø-x-y"

一般的RPC调用会一直阻塞直到远程方法完成为止。但是代理也提供了一个异步调用模式到后台或者并行化RPC调用:

with ClusterRpcProxy(config) as cluster_rpc:
    hello_res = cluster_rpc.service_x.remote_method.call_async("hello")
    world_res = cluster_rpc.service_x.remote_method.call_async("world")
    # do work while waiting
    hello_res.result()  # "hello-x-y"
    world_res.result()  # "world-x-y"

在一个集群里面拥有超过一个的目标服务的实例,RPC请求在这些实例间循环。请求只会由目标服务中的一个实例来处理。

AMQP消息只会在请求被成功处理后才被确认。如果服务确认消息失败,AMQP连接关闭(例如服务进程被杀死)broker将重试调用然后分发消息到可用的服务实例上。

请求和相应的负载为了通过网线传输而被序列化到JSON

事件(发布订阅)

Nameko 事件是一个异步的消息系统,实现了发布订阅模式。服务分发事件,而这些事件可以被0到多个的其他服务所接收。

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc

class ServiceA:
    """ Event dispatching service. """
    name = "service_a"

    dispatch = EventDispatcher()

    @rpc
    def dispatching_method(self, payload):
        self.dispatch("event_type", payload)


class ServiceB:
    """ Event listening service. """
    name = "service_b"

    @event_handler("service_a", "event_type")
    def handle_event(self, payload):
        print("service b received:", payload)

EventHandler进入点有三个处理器类型决定事件消息是如何被一个集群接收的:
SERVICE_POOL:所有事件处理器通过服务名称联合在一起,并且每个池中的只有一个实例接受到事件,类似于RPC进入点的集群行为。这是默认的处理类型。
BROADCAST:每个监听服务实例都会接收到事件。
SINGLETON:只有一个监听服务实例会接收到事件。

广播的例子:

from nameko.events import BROADCAST, event_handler

class ListenerService:
    name = "listener"

    @event_handler(
        "monitor", "ping", handler_type=BROADCAST, reliable_delivery=False
    )
    def ping(self, payload):
        # all running services will respond
        print("pong from {}".format(self.name))

为了通过网络传输,事件都被序列化成了JSON。

HTTP GET & POST

Nameko的HTTP进入点 支持简单的GET和POST

# http.py

import json
from nameko.web.handlers import http

class HttpService:
    name = "http_service"

    @http('GET', '/get/<int:value>')
    def get_method(self, request, value):
        return json.dumps({'value': value})

    @http('POST', '/post')
    def do_post(self, request):
        return u"received: {}".format(request.get_data(as_text=True))

启动服务

$nameko run http
starting services: http_service

测试服务

$ curl -i localhost:8000/get/42
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Content-Length: 13
Date: Fri, 13 Feb 2015 14:51:18 GMT

{'value': 42}
$ curl -i -d "post body" localhost:8000/post
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Content-Length: 19
Date: Fri, 13 Feb 2015 14:55:01 GMT

received: post body

HTTP进入点是基于werkzeug库的。服务方法必须返回以下的任一值:

一个字符串,将变成响应实体response body
一个二元组,(status code, response body)
一个三元组,(status_code, headers dict, response body)
一个werkzeug.wrappers.Response实例
# advanced_http.py

from nameko.web.handlers import http
from werkzeug.wrappers import Response

class Service:
    name = "advanced_http_service"

    @http('GET', '/privileged')
    def forbidden(self, request):
        return 403, "Forbidden"

    @http('GET', '/headers')
    def redirect(self, request):
        return 201, {'Location': 'https://www.example.com/widget/1'}, ""

    @http('GET', '/custom')
    def custom(self, request):
        return Response("payload")

运行服务

$ nameko run advanced_http
starting services: advanced_http_service

测试:

$ curl -i localhost:8000/privileged
HTTP/1.1 403 FORBIDDEN
Content-Type: text/plain; charset=utf-8
Content-Length: 9
Date: Fri, 13 Feb 2015 14:58:02 GMT
curl -i localhost:8000/headers
HTTP/1.1 201 CREATED
Location: https://www.example.com/widget/1
Content-Type: text/plain; charset=utf-8
Content-Length: 0
Date: Fri, 13 Feb 2015 14:58:48 GMT

可以通过重写response_from_exception()方法格式化控制的错误返回(服务器异常控制)。

import json
from nameko.web.handlers import HttpRequestHandler
from werkzeug.wrappers import Response
from nameko.exceptions import safe_for_serialization


class HttpError(Exception):
    error_code = 'BAD_REQUEST'
    status_code = 400


class InvalidArgumentsError(HttpError):
    error_code = 'INVALID_ARGUMENTS'

#重写异常处理
class HttpEntrypoint(HttpRequestHandler):
    def response_from_exception(self, exc):
        if isinstance(exc, HttpError):
            response = Response(
                json.dumps({
                    'error': exc.error_code,
                    'message': safe_for_serialization(exc),
                }),
                status=exc.status_code,
                mimetype='application/json'
            )
            return response
        return HttpRequestHandler.response_from_exception(self, exc)


http = HttpEntrypoint.decorator


class Service:
    name = "http_service"

    @http('GET', '/custom_exception')
    def custom_exception(self, request):
        raise InvalidArgumentsError("Argument `foo` is required.")

运行

$ nameko run http_exceptions
starting services: http_service

测试

$ curl -i http://localhost:8000/custom_exception
HTTP/1.1 400 BAD REQUEST
Content-Type: application/json
Content-Length: 72
Date: Thu, 06 Aug 2015 09:53:56 GMT

{"message": "Argument `foo` is required.", "error": "INVALID_ARGUMENTS"}

计时器

计时器是一个每达到可配置的秒数时刻就触发的简单的入口点。计时器是非集群定制的,而且在所有的服务实例都会触发。

from nameko.timer import timer

class Service:
    name ="service"

    @timer(interval=1)
    def ping(self):
        # method executed every second
        print("pong")

5.内建的Dependency Providers

Nameko包含了一些常用的Dependency Providers。

Config

Config是一个简单的依赖提供器,提供了在运行时只读读取配置值的能力。

from nameko.dependency_providers import Config
from nameko.web.handlers import http


class Service:

    name = "test_config"

    config = Config()

    @property
    def foo_enabled(self):
        return self.config.get('FOO_FEATURE_ENABLED', False)

    @http('GET', '/foo')
    def foo(self, request):
        if not self.foo_enabled:
            return 403, "FeatureNotEnabled"

        return 'foo'

6.社区支持(Community)

社区有大量不是核心项目但你会发现在开发自己的nameko服务是很有用的nameko扩展和补充的库。

扩展

nameko-sqlalchemy
nameko-sentry
nameko-amqp-retry
nameko-bayeux-client
nameko-slack
nameko-eventlog-dispatcher
nameko-redis-py
nameko-redis
nameko-statsd

补充库

django-nameko
flask_nameko
nameko-proxy

7.测试服务

哲学

Nameko规约设计得很容易进行测试。服务可能很小而且功能单一,而且依赖注入使得它很简单就可以替换及分离函数的片段。

nameko自己的测试套件使用pytest库。

单元测试

单元测试通常意味着分离地测试一个单一的服务。例如没有了任何或者大部分的依赖。

worker_factory()工具将从一个服务类创建工作器,并使用mock.MagicMock创建的对象替换掉原来的依赖。 依赖函数可以通过 side_effect和 return_value伪造。

""" Service unit testing best practice.
"""

from nameko.rpc import RpcProxy, rpc
from nameko.testing.services import worker_factory


class ConversionService(object):
    """ Service under test
    """
    name = "conversions"

    maths_rpc = RpcProxy("maths")

    @rpc
    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    @rpc
    def cms_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)


def test_conversion_service():
    # create worker with mock dependencies
    service = worker_factory(ConversionService)

    # add side effects to the mock proxy to the "maths" service
    service.maths_rpc.multiply.side_effect = lambda x, y: x * y
    service.maths_rpc.divide.side_effect = lambda x, y: x / y

    # test inches_to_cm business logic
    assert service.inches_to_cm(300) == 762
    service.maths_rpc.multiply.assert_called_once_with(300, 2.54)

    # test cms_to_inches business logic
    assert service.cms_to_inches(762) == 300
    service.maths_rpc.divide.assert_called_once_with(762, 2.54)

有些情况下使用一个代替性的依赖比伪造依赖更有用。这个可能是一个全功能的替换(例如一个测试数据库会话)或者一个轻量级的提供部分功能的垫片。

""" Service unit testing best practice, with an alternative dependency.
"""

import pytest
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from nameko.rpc import rpc
from nameko.testing.services import worker_factory

# using community extension from http://pypi.python.org/pypi/nameko-sqlalchemy
from nameko_sqlalchemy import Session


Base = declarative_base()


class Result(Base):
    __tablename__ = 'model'
    id = Column(Integer, primary_key=True)
    value = Column(String(64))


class Service:
    """ Service under test
    """
    name = "service"

    db = Session(Base)

    @rpc
    def save(self, value):
        result = Result(value=value)
        self.db.add(result)
        self.db.commit()


@pytest.fixture
def session():
    """ Create a test database and session
    """
    engine = create_engine('sqlite:///:memory:')
    Base.metadata.create_all(engine)
    session_cls = sessionmaker(bind=engine)
    return session_cls()


def test_service(session):

    # create instance, providing the test database session
    service = worker_factory(Service, db=session)

    # verify ``save`` logic by querying the test database
    service.save("helloworld")
    assert session.query(Result.value).all() == [("helloworld",)]

集成测试

在nameko中集成测试意味着在数个服务间测试接口。建议的方法是以正常的方式运行所有的被测试的服务,然后使用帮助类触发入口点的行为。

""" Service integration testing best practice.
"""

from nameko.rpc import rpc, RpcProxy
from nameko.testing.utils import get_container
from nameko.testing.services import entrypoint_hook


class ServiceX:
    """ Service under test
    """
    name = "service_x"

    y = RpcProxy("service_y")

    @rpc
    def remote_method(self, value):
        res = "{}-x".format(value)
        return self.y.append_identifier(res)


class ServiceY:
    """ Service under test
    """
    name = "service_y"

    @rpc
    def append_identifier(self, value):
        return "{}-y".format(value)


def test_service_x_y_integration(runner_factory, rabbit_config):

    # run services in the normal manner
    runner = runner_factory(rabbit_config, ServiceX, ServiceY)
    runner.start()

    # artificially fire the "remote_method" entrypoint on ServiceX
    # and verify response
    container = get_container(runner, ServiceX)
    with entrypoint_hook(container, "remote_method") as entrypoint:
        assert entrypoint("value") == "value-x-y"

注意这里在ServiceX 和ServiceY之间的接口就像在正常操作一样。

对于一个特殊的测试,接口如果在超出测试范围的(与测试无关的),可以使用下面的其中一个测试帮助器进行禁用。

restrict_entrypoints
nameko.testing.services.restrict_entrypoints(container, *entrypoints)

限制在container 的进入点为特定的名称的进入点。

这些方法必须在容器启动前被调用。

用法

下面的服务定义有两个进入点:

class Service(object):
    name = "service"

    @timer(interval=1)
    def foo(self, arg):
        pass

    @rpc
    def bar(self, arg)
        pass

    @rpc
    def baz(self, arg):
        pass

container = ServiceContainer(Service, config)

禁用在foo上的定时器进入点,只留下RPC进入点

restrict_entrypoints(container, "bar", "baz")

注意不可能单独地把多个进入点看成同一个方法。

replace_dependencies
nameko.testing.services.replace_dependencies(container, *dependencies, **dependency_map)

替换依赖提供器在容器上使用伪造的依赖提供器。

在 * dependencies声明的依赖将由一个MockDependencyProvider替换,这个MockDependencyProvider会注入一个魔术伪造器而不是依赖。

另外,你可能使用关键字参数命名依赖,而且提供MockDependencyProvider应该注入的代替的值。

为每个在 * dependencies声明的依赖返回MockDependencyProvider.dependency,这样对替换的依赖的调用就能被检查出来。如果只有一个依赖被替换,则返回一个单一的对象,否则由生成器yield出在 * dependencies声明的依赖。注意任何在 * * dependency_map指出的替换的依赖都不会被返回。

替换是在容器实例里执行的,而且对服务类没有影响。所以新的容器实例不会影响旧的容器实例。

用法

from nameko.rpc import RpcProxy, rpc
from nameko.standalone.rpc import ServiceRpcProxy

class ConversionService(object):
    name = "conversions"

    maths_rpc = RpcProxy("maths")

    @rpc
    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    @rpc
    def cm_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)

container = ServiceContainer(ConversionService, config)
mock_maths_rpc = replace_dependencies(container, "maths_rpc")
mock_maths_rpc.divide.return_value = 39.37

container.start()

with ServiceRpcProxy('conversions', config) as proxy:
    proxy.cm_to_inches(100)

# assert that the dependency was called as expected
mock_maths_rpc.divide.assert_called_once_with(100, 2.54)

通过关键字指定特殊的替换

class StubMaths(object):

    def divide(self, val1, val2):
        return val1 / val2

replace_dependencies(container, maths_rpc=StubMaths())

container.start()

with ServiceRpcProxy('conversions', config) as proxy:
    assert proxy.cm_to_inches(127) == 50.0

完整的例子

使用两个范围限制帮助器

"""
This file defines several toy services that interact to form a shop of the
famous ACME Corporation. The AcmeShopService relies on the StockService,
InvoiceService and PaymentService to fulfil its orders. They are not best
practice examples! They're minimal services provided for the test at the
bottom of the file.

``test_shop_integration`` is a full integration test of the ACME shop
"checkout flow". It demonstrates how to test the multiple ACME services in
combination with each other, including limiting service interactions by
replacing certain entrypoints and dependencies.
"""

from collections import defaultdict

import pytest

from nameko.extensions import DependencyProvider
from nameko.events import EventDispatcher, event_handler
from nameko.exceptions import RemoteError
from nameko.rpc import rpc, RpcProxy
from nameko.standalone.rpc import ServiceRpcProxy
from nameko.testing.services import replace_dependencies, restrict_entrypoints
from nameko.testing.utils import get_container
from nameko.timer import timer


class NotLoggedInError(Exception):
    pass


class ItemOutOfStockError(Exception):
    pass


class ItemDoesNotExistError(Exception):
    pass


class ShoppingBasket(DependencyProvider):
    """ A shopping basket tied to the current ``user_id``.
    """
    def __init__(self):
        self.baskets = defaultdict(list)

    def get_dependency(self, worker_ctx):

        class Basket(object):
            def __init__(self, basket):
                self._basket = basket
                self.worker_ctx = worker_ctx

            def add(self, item):
                self._basket.append(item)

            def __iter__(self):
                for item in self._basket:
                    yield item

        try:
            user_id = worker_ctx.data['user_id']
        except KeyError:
            raise NotLoggedInError()
        return Basket(self.baskets[user_id])


class AcmeShopService:
    name = "acmeshopservice"

    user_basket = ShoppingBasket()
    stock_rpc = RpcProxy('stockservice')
    invoice_rpc = RpcProxy('invoiceservice')
    payment_rpc = RpcProxy('paymentservice')

    fire_event = EventDispatcher()

    @rpc
    def add_to_basket(self, item_code):
        """ Add item identified by ``item_code`` to the shopping basket.

        This is a toy example! Ignore the obvious race condition.
        """
        stock_level = self.stock_rpc.check_stock(item_code)
        if stock_level > 0:
            self.user_basket.add(item_code)
            self.fire_event("item_added_to_basket", item_code)
            return item_code

        raise ItemOutOfStockError(item_code)

    @rpc
    def checkout(self):
        """ Take payment for all items in the shopping basket.
        """
        total_price = sum(self.stock_rpc.check_price(item)
                          for item in self.user_basket)

        # prepare invoice
        invoice = self.invoice_rpc.prepare_invoice(total_price)

        # take payment
        self.payment_rpc.take_payment(invoice)

        # fire checkout event if prepare_invoice and take_payment succeeded
        checkout_event_data = {
            'invoice': invoice,
            'items': list(self.user_basket)
        }
        self.fire_event("checkout_complete", checkout_event_data)
        return total_price


class Warehouse(DependencyProvider):
    """ A database of items in the warehouse.

    This is a toy example! A dictionary is not a database.
    """
    def __init__(self):
        self.database = {
            'anvil': {
                'price': 100,
                'stock': 3
            },
            'dehydrated_boulders': {
                'price': 999,
                'stock': 12
            },
            'invisible_paint': {
                'price': 10,
                'stock': 30
            },
            'toothpicks': {
                'price': 1,
                'stock': 0
            }
        }

    def get_dependency(self, worker_ctx):
        return self.database


class StockService:
    name = "stockservice"

    warehouse = Warehouse()

    @rpc
    def check_price(self, item_code):
        """ Check the price of an item.
        """
        try:
            return self.warehouse[item_code]['price']
        except KeyError:
            raise ItemDoesNotExistError(item_code)

    @rpc
    def check_stock(self, item_code):
        """ Check the stock level of an item.
        """
        try:
            return self.warehouse[item_code]['stock']
        except KeyError:
            raise ItemDoesNotExistError(item_code)

    @rpc
    @timer(100)
    def monitor_stock(self):
        """ Periodic stock monitoring method. Can also be triggered manually
        over RPC.

        This is an expensive process that we don't want to exercise during
        integration testing...
        """
        raise NotImplementedError()

    @event_handler('acmeshopservice', "checkout_complete")
    def dispatch_items(self, event_data):
        """ Dispatch items from stock on successful checkouts.

        This is an expensive process that we don't want to exercise during
        integration testing...
        """
        raise NotImplementedError()


class AddressBook(DependencyProvider):
    """ A database of user details, keyed on user_id.
    """
    def __init__(self):
        self.address_book = {
            'wile_e_coyote': {
                'username': 'wile_e_coyote',
                'fullname': 'Wile E Coyote',
                'address': '12 Long Road, High Cliffs, Utah',
            },
        }

    def get_dependency(self, worker_ctx):
        def get_user_details():
            try:
                user_id = worker_ctx.data['user_id']
            except KeyError:
                raise NotLoggedInError()
            return self.address_book.get(user_id)
        return get_user_details


class InvoiceService:
    name = "invoiceservice"

    get_user_details = AddressBook()

    @rpc
    def prepare_invoice(self, amount):
        """ Prepare an invoice for ``amount`` for the current user.
        """
        address = self.get_user_details().get('address')
        fullname = self.get_user_details().get('fullname')
        username = self.get_user_details().get('username')

        msg = "Dear {}. Please pay ${} to ACME Corp.".format(fullname, amount)
        invoice = {
            'message': msg,
            'amount': amount,
            'customer': username,
            'address': address
        }
        return invoice


class PaymentService:
    name = "paymentservice"

    @rpc
    def take_payment(self, invoice):
        """ Take payment from a customer according to ``invoice``.

        This is an expensive process that we don't want to exercise during
        integration testing...
        """
        raise NotImplementedError()

# =============================================================================
# Begin test
# =============================================================================


@pytest.yield_fixture
def rpc_proxy_factory(rabbit_config):
    """ Factory fixture for standalone RPC proxies.

    Proxies are started automatically so they can be used without a ``with``
    statement. All created proxies are stopped at the end of the test, when
    this fixture closes.
    """
    all_proxies = []

    def make_proxy(service_name, **kwargs):
        proxy = ServiceRpcProxy(service_name, rabbit_config, **kwargs)
        all_proxies.append(proxy)
        return proxy.start()

    yield make_proxy

    for proxy in all_proxies:
        proxy.stop()


def test_shop_checkout_integration(
    rabbit_config, runner_factory, rpc_proxy_factory
):
    """ Simulate a checkout flow as an integration test.

    Requires instances of AcmeShopService, StockService and InvoiceService
    to be running. Explicitly replaces the rpc proxy to PaymentService so
    that service doesn't need to be hosted.

    Also replaces the event dispatcher dependency on AcmeShopService and
    disables the timer entrypoint on StockService. Limiting the interactions
    of services in this way reduces the scope of the integration test and
    eliminates undesirable side-effects (e.g. processing events unnecessarily).
    """
    context_data = {'user_id': 'wile_e_coyote'}
    shop = rpc_proxy_factory('acmeshopservice', context_data=context_data)

    runner = runner_factory(
        rabbit_config, AcmeShopService, StockService, InvoiceService)

    # replace ``event_dispatcher`` and ``payment_rpc``  dependencies on
    # AcmeShopService with ``MockDependencyProvider``\s
    shop_container = get_container(runner, AcmeShopService)
    fire_event, payment_rpc = replace_dependencies(
        shop_container, "fire_event", "payment_rpc")

    # restrict entrypoints on StockService
    stock_container = get_container(runner, StockService)
    restrict_entrypoints(stock_container, "check_price", "check_stock")

    runner.start()

    # add some items to the basket
    assert shop.add_to_basket("anvil") == "anvil"
    assert shop.add_to_basket("invisible_paint") == "invisible_paint"

    # try to buy something that's out of stock
    with pytest.raises(RemoteError) as exc_info:
        shop.add_to_basket("toothpicks")
    assert exc_info.value.exc_type == "ItemOutOfStockError"

    # provide a mock response from the payment service
    payment_rpc.take_payment.return_value = "Payment complete."

    # checkout
    res = shop.checkout()

    total_amount = 100 + 10
    assert res == total_amount

    # verify integration with mocked out payment service
    payment_rpc.take_payment.assert_called_once_with({
        'customer': "wile_e_coyote",
        'address': "12 Long Road, High Cliffs, Utah",
        'amount': total_amount,
        'message': "Dear Wile E Coyote. Please pay $110 to ACME Corp."
    })

    # verify events fired as expected
    assert fire_event.call_count == 3


if __name__ == "__main__":
    import sys
    pytest.main(sys.argv)

其他的帮助方法

entrypoint_hook
提供了context_data 模仿特殊调用上下文的方法。
import pytest

from nameko.contextdata import Language
from nameko.rpc import rpc
from nameko.testing.services import entrypoint_hook


class HelloService:
    """ Service under test
    """
    name = "hello_service"

    language = Language()

    @rpc
    def hello(self, name):
        greeting = "Hello"
        if self.language == "fr":
            greeting = "Bonjour"
        elif self.language == "de":
            greeting = "Gutentag"

        return "{}, {}!".format(greeting, name)


@pytest.mark.parametrize("language, greeting", [
    ("en", "Hello"),
    ("fr", "Bonjour"),
    ("de", "Gutentag"),
])
def test_hello_languages(language, greeting, container_factory, rabbit_config):

    container = container_factory(HelloService, rabbit_config)
    container.start()

    context_data = {'language': language}
    with entrypoint_hook(container, 'hello', context_data) as hook:
        assert hook("Matt") == "{}, Matt!".format(greeting)
entrypoint_waiter

提供了控阻塞调用的测试方法。

from nameko.events import event_handler
from nameko.standalone.events import event_dispatcher
from nameko.testing.services import entrypoint_waiter


class ServiceB:
    """ Event listening service.
    """
    name = "service_b"

    @event_handler("service_a", "event_type")
    def handle_event(self, payload):
        print("service b received", payload)


def test_event_interface(container_factory, rabbit_config):

    container = container_factory(ServiceB, rabbit_config)
    container.start()

    dispatch = event_dispatcher(rabbit_config)

    # prints "service b received payload" before "exited"
    with entrypoint_waiter(container, 'handle_event'):
        dispatch("service_a", "event_type", "payload")
    print("exited")

8.编写扩展

结构

所有的扩展都是nameko.extensions.Extension的子类。这个基类提供了基本的扩展的架构,特别是下面的方法可以被重写以添加特定的功能。

Extension.setup()
在容器启动前调用绑定的扩展。
扩展应该在这里做任何必要的初始化工作。
Extension.start()
当容器成功启动时调用绑定的扩展。
这个方法只会在所有的的扩展都成功调用setup后才会调用。如果扩展影响外部事件,那么它现在就会开始影响它们。
Extension.stop()
在容器开始关闭时调用。
所有扩展应该优雅地在这里关闭。

编写依赖提供器

所有的依赖提供器都应该是nameko.extensions.DependencyProvider的子类,并实现了get_dependency()方法返回一个注入到服务工作器中的一个实例。

推荐的做法是为依赖注入最小的必须的接口。这将减少测试面也更容易由测试代码覆盖。

依赖提供器也会绑定到工作器的生命周期。下面的三个方法会为每个工作器的所有依赖提供器中被调用

DependencyProvider.worker_setup(worker_ctx)
在服务工作器工作之前执行一个任务。
依赖应该在这里做任何的预处理,在遇到失败事件是抛出异常。
参数:worker_ctx : WorkerContext
nameko.containers.ServiceContainer.spawn_worker
DependencyProvider.worker_result(worker_ctx, result=None, exc_info=None)
在服务工作器执行获得结果后调用。
依赖应该在这里处理结果。这个方法会在任何工作器完成工作时为所有的依赖实例进行调用。
例如:一个数据库会话依赖可能会输出缓存的事务
DependencyProvider.worker_teardown(worker_ctx)
在服务工作器完成任务后调用。
依赖可以在这里做任何的后处理,在出现失败事件时抛出异常。
例如:一个数据库会话可能会提交会话。

同步和线程安全

由get_dependency()返回的对象应该是线程安全的,因为它将会被多个同步的正在运行的工作器访问。

在执行服务的相同的线程中,工作器生命周期方法会被调用。这意味着,你能定义线程本地变量,而且通过每个方法访问他们。

例子

一个简单的DependencyProvider 发送消息到SQS队列

from nameko.extensions import DependencyProvider

import boto3

class SqsSend(DependencyProvider):

    def __init__(self, url, region="eu-west-1", **kwargs):
        self.url = url
        self.region = region
        super(SqsSend, self).__init__(**kwargs)

    def setup(self):
        self.client = boto3.client('sqs', region_name=self.region)

    def get_dependency(self, worker_ctx):

        def send_message(payload):
            # assumes boto client is thread-safe for this action, because it
            # happens inside the worker threads
            self.client.send_message(
                QueueUrl=self.url,
                MessageBody=payload
            )
        return send_message

写入口点

你可以实现一个新的入口点扩展,如果你想为初始化服务支持新的传输或机制。

一个入口点最少的要求
1.继承nameko.extensions.Entrypoint
2.实现start()方法,当容器启动时启动入口点。如果后台线程是必要的,推荐做法是使用一个由服务线程管理的线程。
3.在合适的时候调用spawn_worker()绑定容器

例子

一个从SQS队列接收消息的简单入口点。

from nameko.extensions import Entrypoint
from functools import partial

import boto3


class SqsReceive(Entrypoint):

    def __init__(self, url, region="eu-west-1", **kwargs):
        self.url = url
        self.region = region
        super(SqsReceive, self).__init__(**kwargs)

    def setup(self):
        self.client = boto3.client('sqs', region_name=self.region)

    def start(self):
        self.container.spawn_managed_thread(
            self.run, identifier="SqsReceiver.run"
        )

    def run(self):
        while True:
            response = self.client.receive_message(
                QueueUrl=self.url,
                WaitTimeSeconds=5,
            )
            messages = response.get('Messages', ())
            for message in messages:
                self.handle_message(message)

    def handle_message(self, message):
        handle_result = partial(self.handle_result, message)

        args = (message['Body'],)
        kwargs = {}

        self.container.spawn_worker(
            self, args, kwargs, handle_result=handle_result
        )

    def handle_result(self, message, worker_ctx, result, exc_info):
        # assumes boto client is thread-safe for this action, because it
        # happens inside the worker threads
        self.client.delete_message(
            QueueUrl=self.url,
            ReceiptHandle=message['ReceiptHandle']
        )
        return result, exc_info


receive = SqsReceive.decorator

在服务类中使用

from .sqs_receive import receive


class SqsService(object):
    name = "sqs-service"

    @receive('https://sqs.eu-west-1.amazonaws.com/123456789012/nameko-sqs')
    def handle_sqs_message(self, body):
        """ This method is called by the `receive` entrypoint whenever
        a message sent to the given SQS queue.
        """
        print(body)
        return body

期望的异常

Entrypoint 基类构造器能接受一个异常的列表,用于区分方法返回的异常的不同。

class Service:
    name = "service"

    auth = Auth()

    @rpc(expected_exceptions=Unauthorized)
    def update(self, data):
        if not self.auth.has_role("admin"):
            raise Unauthorized()

        # perform update
        raise TypeError("Whoops, genuine error.")

更觉不同的异常可以,用于检查而做出不同的响应逻辑。相关的检测库有 nameko-sentry。

敏感的参数

Entrypoint 构造器允许你标记某个部分的参数为敏感参数。

class Service:
    name = "service"

    auth = Auth()

    @rpc(sensitive_arguments="password", expected_exceptions=Unauthenticated)
    def login(self, username, password):
        # raises Unauthenticated if username/password do not match
        return self.auth.authenticate(username, password)

nameko.utils.get_redacted_args()方法将隐藏敏感参数返回 **

一个有用的扩展可以记录和保存进口点的调用信息:nameko-tracer

# by dictionary key
@entrypoint(sensitive_arguments="foo.a")
def method(self, foo):
    pass

>>> get_redacted_args(method, foo={'a': 1, 'b': 2})
... {'foo': {'a': '******', 'b': 2}}

# list index
@entrypoint(sensitive_arguments="foo.a[1]")
def method(self, foo):
    pass

>>> get_redacted_args(method, foo=[{'a': [1, 2, 3]}])
... {'foo': {'a': [1, '******', 3]}}

繁殖后台线程

扩展,需要在一个线程里执行工作,它可能需要选择使用spawn_managed_thread()委托service 容器来管理线程。

 def start(self):
        self.container.spawn_managed_thread(
            self.run, identifier="SqsReceiver.run"
        )

推荐委托线程管理到容器,因为

1.管理的线程将总是当容器停止或者被杀死时被终止。
2.在管理的线程中未处理的异常会由容器捕获,而且会导致线程产生错误信息并终止,这个机制可以防止进程挂起。
 类似资料: