- 开始编辑时间:2022年5月22日
- 完成撰写时间:2022年5月23日
A microservices framework for Python that lets service developers concentrate on application logic and encourages testability.
一种python的微服务架构,让服务开发者专注于应用逻辑,并鼓励可测试性。
安装rabbitmq(因Nameko使用了RabiitMQ内建的AMQP RPC特性)
sudo apt-get install rabbitmq-server
安装nameko
pip install nameko
内建特性:
可以直接响应RPC信息,在特定操作时发送事件和侦听其它服务的事件。同时为支持AMQP的客户端提供HTTP接口,为Javascript客户端提供websocket接口。
Nameko鼓励”依赖关系注入模式”,这使编译和测试服务变得简洁。
一个Nameko服务就是一个python类。该类将应用逻辑压缩在类方法(Method)中,并将所有以来声明为属性。
方法通过特殊的入口装饰暴露给外部。如下代码所示(@rpc)
from nameko.rpc import rpc, RpcProxy
class Service:
name = "service"
# we depend on the RPC interface of "another_service"
other_rpc = RpcProxy("another_service")
@rpc # `method` is exposed over RPC
def method(self):
# application logic goes here
pass
入口是它们装饰的服务方法的网关,通常监控外部的实体,如消息队列。在相关事件上,入口可能“触发”,其所装饰的方法将在工作线程上执行。
依赖关系是隐藏不属于核心服务逻辑的代码的机会。将其它服务作为依赖项实现,声明一个依赖,将其作为服务代码与其它所有内容之间的网关。
当一个入口触发时,工作线程将创建。一个工作线程就是服务类的一个实例,但该实例依赖项的声明替换为这些依赖项的实例。
向一个服务添加依赖是声明式的。类的属性是一个声明,而不是工作线程能够实际上使用的接口。
类的属性式一个DependencyProvider. 依赖关系提供一个*get_denpendency()*方法,将其结果注入到新创建的工作线程中。
工作线程的生命周期:
worker = Service()
worker.other_rpc = worker.other_rpc.get_dependency()
worker.method()
del worker
Nameko 构建在 eventlet 库之上,该库通过“greenthreads”提供并发。并发模型是具有隐式 yield 的协同例程。每个工作线程都在自己的绿色线程中执行。可以根据每个工作线程等待 I/O 所花费的时间来调整并发工作线程的最大数量。
许多使用套接字且通常被视为线程安全的 C 扩展可能不适用于“greenthreads”。其中包括librabbitmq,MySQLdb等。
所有的入口和依赖提供都将作为扩展生效。这是因为入口和依赖都在服务代码以以外,而且髌骨吧是所有的服务都需要它们。
运行服务所需的只是服务类和相关的配置。运行一个或多个服务最简单的方式是使用Nameko CLI:
nameko run module:[ServiceClass]
这个命令将会在module中查找ServiceClass并执行。
每一个服务类都被委托给一个ServiceContainer.容器封装服务所需的所有设计功能,并且包含服务类上的所有扩展,实例如下:
from nameko.containers import ServiceContainer
class Service:
name = "service"
# create a container
container = ServiceContainer(Service, config={})
# ``container.extensions`` exposes all extensions used by the service
service_extensions = list(container.extensions)
# start service
container.start()
# stop service
container.stop()
ServiceRunner是包含多个容器的封装,对外暴露同时执行和关闭所封装容器的方法。这也是nameko run内部使用的方法。也可以单独执行ServiceRunner中的某一个容器。实例如下:
from nameko.runners import ServiceRunner
from nameko.testing.utils import get_container
class ServiceA:
name = "service_a"
class ServiceB:
name = "service_b"
# create a runner for ServiceA and ServiceB
runner = ServiceRunner(config={})
runner.add_service(ServiceA)
runner.add_service(ServiceB)
# ``get_container`` will return the container for a particular service
container_a = get_container(runner, ServiceA)
# start both services
runner.start()
# stop both services
runner.stop()
nameko run <module>[:<ServiceClass>]
搜索并执行一个服务类。这将在前台运行这个服务,知道进程终止。
默认配置参数可通过*–config*进行转换
nameko run --config ./footbar.yaml <modul>[:<ServiceClass>]
一个YMAL配置文件示例如下:
AMQP_URI: 'pyamqp://guest:guest@localhost'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10
LOGGING:
version: 1
handlers:
console:
class: logging.StreamHandler
root:
level: DEBUG
handlers: [console]
配置的参数值可通过内置的Config接口获取。
# foobar.yaml
AMQP_URI: pyamqp://${RABBITMQ_USER:guest}:${RABBITMQ_PASSWORD:password}@${RABBITMQ_HOST:localhost}
RABBITMQ_USER=user RABBITMQ_PASSWORD=password RABBITMQ_HOST=host nameko run --config ./foobar.yaml <module>[:<ServiceClass>]
# foobar.yaml
AMQP_URI: !env_var "pyamqp://${RABBITMQ_USER:guest}:${RABBITMQ_PASSWORD:password}@${RABBITMQ_HOST:localhost}"
# foobar.yaml
...
THINGS: ${A_LIST_OF_THINGS}
A_LIST_OF_THINGS=[A,B,C] nameko run --config ./foobar.yaml <module>[:<ServiceClass>]
# foobar.yaml
LANDING_URL_TEMPLATE: ${LANDING_URL_TEMPLATE:https://example.com/{path}}
nameko shell
启动一个与远程nameko服务相互作用的python shell.先将一个模块n添加到内置的命名空间;nameko shell提供RPC呼叫和事件调度的功能。
$ nameko shell
>>> n.rpc.target_service.target_method(...)
# RPC response
$ nameko shell
>>> n.dispatch_event("source_service", "event_type", "event_payload")
Nameko提供基于AMQO实现的RPC扩展,其由@rpc入口组成,这是一种与其它服务交互的服务代理。非Nameko客户端的可用于对集群进行RPC调用的独立代理。
from nameko.rpc import rpc, RpcProxy
class ServiceY:
name = "service_y"
@rpc
def append_identifier(self, value):
//对外暴露的接口代理
return u"{}-y".format(value)
class ServiceX:
name = "service_x"
y = RpcProxy("service_y") // 通过name获取服务代理
@rpc
def remote_method(self, value):
res = u"{}-x".format(value)
return self.y.append_identifier(res)
from nameko.standalone.rpc import ClusterRpcProxy
config = {
'AMQP_URI': AMQP_URI # e.g. "pyamqp://guest:guest@localhost"
}
with ClusterRpcProxy(config) as cluster_rpc:
cluster_rpc.service_x.remote_method("hellø") # "hellø-x-y" 通过cluster获取某一个服务的接口,并进行远程调用
一般的RPC回调都在远程方法完成后执行,但代理也有后台异步调用模式或者并行RPC调用:
with ClusterRpcProxy(config) as cluster_rpc:
hello_res = cluster_rpc.service_x.remote_method.call_async("hello")
world_res = cluster_rpc.service_x.remote_method.call_async("world")
# do work while waiting 异步调用期间可以执行其它任务
hello_res.result() # "hello-x-y"
world_res.result() # "world-x-y"
在目标服务具有一个以上实例的集群中,RPC请求在实例之间进行轮询。请求最终将会被目标服务的一个实例处理。
AMQP消息仅会在请求被成功处理后发送确认消息。如果服务无法确认消息并且AMQP连接已关闭,则代理将撤销消息,然后消息分配给可用的服务实例。
请求和回应有效负载被序列化到JSON格式传输。
Nameko 事件是一个异步消息系统,实现了发布和订阅模式。服务调度(发送)由一个或多个服务接收的事件:
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc
class ServiceA:
""" Event dispatching service. """
name = "service_a"
dispatch = EventDispatcher() //事件分发实例
@rpc
def dispatching_method(self, payload):
// 发布可远程调用
self.dispatch("event_type", payload) // 发布 {event_type:payload}
class ServiceB:
""" Event listening service. """
name = "service_b"
@event_handler("service_a", "event_type")
def handle_event(self, payload):
print("service b received:", payload)
Eventhandler入口函数具有三个handler_type,决定事件消息将如何从集群中接收。
一个Broadcast实例:(需要在入口函数中指明:handler_type=BROADCAST)
from nameko.events import BROADCAST, event_handler
class ListenerService:
name = "listener"
@event_handler(
"monitor", "ping", handler_type=BROADCAST, reliable_delivery=False
)
def ping(self, payload):
# all running services will respond
print("pong from {}".format(self.name))
HTTP入口函数建立在werkzeug上,支持所有的标准HTTP方法(GET,POST,DELET,PUT等)。可通过逗号分割同时支持多种方法:
默认端口号为:8000
# http.py
import json
from nameko.web.handlers import http
class HttpService:
name = "http_service"
@http('GET', '/get/<int:value>')
def get_method(self, request, value):
return json.dumps({'value': value})
@http('POST', '/post')
def do_post(self, request):
return u"received: {}".format(request.get_data(as_text=True))
@http('GET,PUT,POST,DELETE', '/multi')
def do_multi(self, request):
return request.method
官方不建议使用namekohttp功能,可直接使用Django。利用库django_nameko中的get_pool直接在Django接口中调用Nameko的微服务。
定时器入口能够根据配置的秒数进行不断地触发,是服务实例内进行触发。
from nameko.timer import timer
class Service:
name ="service"
@timer(interval=1)
def ping(self):
# 服务将会每一秒打印一个 "pong"
# method executed every second
print("pong")
Nameko通过“依赖提供 dependency providers 能够导入配置。
Config是一个简单的依赖提供器,提供给服务在运行时仅读配置变量的权限。
from nameko.dependency_providers import Config
from nameko.web.handlers import http
class Service:
name = "test_config"
config = Config() // 实例化配置
@property
def foo_enabled(self):
return self.config.get('FOO_FEATURE_ENABLED', False) //通过get获取配置参数值
@http('GET', '/foo')
def foo(self, request):
if not self.foo_enabled:
return 403, "FeatureNotEnabled"
return 'foo'
在核心项目以外,社区提供了大量的nameko扩展和增强库,以方便开发者使用。
Nameko的惯例就是将测试设计的尽可能简单。微服务是小且目的单一的;依赖注入使设计功能的替换和隔离变得简单。
Nameko中的单元测试通常指对一个独立的服务进行独立测试,不考虑任何其它依赖。
wroker_factory()根据所给的服务创建一个工程线程,服务的依赖将被mock.MagicMock对象替换。可通过side_effect和return_value模仿依赖功能。
""" Service unit testing best practice.
"""
from nameko.rpc import RpcProxy, rpc
from nameko.testing.services import worker_factory
class ConversionService(object):
""" Service under test 用于测试的服务
"""
name = "conversions"
maths_rpc = RpcProxy("maths")
@rpc
def inches_to_cm(self, inches):
return self.maths_rpc.multiply(inches, 2.54)
@rpc
def cms_to_inches(self, cms):
return self.maths_rpc.divide(cms, 2.54)
def test_conversion_service():
# create worker with mock dependencies
# 创建模拟以来的工作线程
service = worker_factory(ConversionService)
# add side effects to the mock proxy to the "maths" service
# maths是微服务的一个依赖,通过add side effect模拟代理这个微服务。
service.maths_rpc.multiply.side_effect = lambda x, y: x * y
service.maths_rpc.divide.side_effect = lambda x, y: x / y
# test inches_to_cm business logic
# 测试 微服务的某个接口的逻辑,这个接口中的其它服务依赖,已由add side effect模拟代理。
# 使用assert进行断言。
assert service.inches_to_cm(300) == 762
service.maths_rpc.multiply.assert_called_once_with(300, 2.54)
# test cms_to_inches business logic
assert service.cms_to_inches(762) == 300
service.maths_rpc.divide.assert_called_once_with(762, 2.54)
有些时候,提供可选择的依赖而不是模拟更合适,例如全流程测试,如数据连接。
""" Service unit testing best practice, with an alternative dependency.
"""
import pytest
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from nameko.rpc import rpc
from nameko.testing.services import worker_factory
# using community extension from http://pypi.python.org/pypi/nameko-sqlalchemy
from nameko_sqlalchemy import Session
Base = declarative_base()
class Result(Base):
__tablename__ = 'model'
id = Column(Integer, primary_key=True)
value = Column(String(64))
class Service:
""" Service under test
"""
name = "service"
db = Session(Base)
@rpc
def save(self, value):
result = Result(value=value)
self.db.add(result)
self.db.commit()
@pytest.fixture
def session():
""" Create a test database and session
"""
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
session_cls = sessionmaker(bind=engine)
return session_cls()
def test_service(session):
# create instance, providing the test database session
service = worker_factory(Service, db=session)
# verify ``save`` logic by querying the test database
service.save("helloworld")
assert session.query(Result.value).all() == [("helloworld",)]
Nameko中的一体化测试就是测试服务间的接口。推荐的方式是:按照正常方式执行所有的测试服务,然后使用helper触发一个入口函数.
""" Service integration testing best practice.
"""
from nameko.rpc import rpc, RpcProxy
from nameko.testing.utils import get_container
from nameko.testing.services import entrypoint_hook
class ServiceX:
""" Service under test
"""
name = "service_x"
y = RpcProxy("service_y") // 依赖
@rpc
def remote_method(self, value):
res = "{}-x".format(value)
return self.y.append_identifier(res)
class ServiceY:
""" Service under test
"""
name = "service_y"
@rpc
def append_identifier(self, value):
return "{}-y".format(value)
def test_service_x_y_integration(runner_factory, rabbit_config):
# run services in the normal manner
# 按照常规方式启动服务
runner = runner_factory(rabbit_config, ServiceX, ServiceY)
runner.start()
# artificially fire the "remote_method" entrypoint on ServiceX
# 人为触发ServiceX的remote_method入口函数,然后验证返回
# and verify response
container = get_container(runner, ServiceX)
with entrypoint_hook(container, "remote_method") as entrypoint:
assert entrypoint("value") == "value-x-y"
Nameko的测试部分nameko.testing.pytest是基于pytest实现的,如果了解pytest,将能够进行一些有用的配置。
Nameko的扩展组件需要继承于nameko.extensions.Extension。这部分内容较为深,不放在此处讨论。