python分布式rpc框架zerorpc安装及使用教程

秦德海
2023-12-01

1.简介及安装

rpc使构建分布式系统简单许多,在云计算的实现中有很广泛的应用
rpc可以是异步的

python实现rpc,可以使用标准库里的SimpleXMLRPCServer,另外zerorpc是第三方库支持rpc
zerorpc 是基于 ZeroMQ 和 MessagePack,速度相对快,响应时间短,并发高

zerorpc需要额外安装,而SimpleXMLRPCServer不需要额外安装,但是SimpleXMLRPCServer性能相对差一些

zerorpc安装:

yum -y install gcc*
yum -y install gcc-c++ libuuid-devel python-uuid uuid
yum -y install zlib zlib-dev

安装pip:
wget https://bootstrap.pypa.io/get-pip.py
python get-pip.py

pip安装:
pip install zerorpc

zerorpc实际上会依赖msgpack-python, pyzmq, future, greenlet, gevent

其中gevent支持协程

Installing collected packages: msgpack-python, pyzmq, future, greenlet, gevent, zerorpc
  Running setup.py install for msgpack-python ... done
  Running setup.py install for future ... done
  Running setup.py install for zerorpc ... done
Successfully installed future-0.16.0 gevent-1.2.1 greenlet-0.4.12 msgpack-python-0.4.8 pyzmq-16.0.2 zerorpc-0.6.1

官网参考:http://www.zerorpc.io/

2.使用示例:

说明:此实验在centos6版本中运行成功的,
但是在centos7中调试一直有异常,表现为,zerorpc请求超时zerorpc.exceptions.LostRemote: Lost remote after 10s heartbeat
这个错误也可能会出现在请求没有对应的响应函数时,zerorpc不会抛出异常,等待超时

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/core.py", line 278, in <lambda>
    return lambda *args, **kargs: self(method, *args, **kargs)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/core.py", line 270, in __call__
    return self._process_response(request_event, bufchan, timeout)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/core.py", line 227, in _process_response
    reply_event = bufchan.recv(timeout=timeout)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/channel.py", line 255, in recv
    event = self._input_queue.get(timeout=timeout)
  File "/usr/local/python27/lib/python2.7/site-packages/gevent/queue.py", line 284, in get
    return self.__get_or_peek(self._get, block, timeout)
  File "/usr/local/python27/lib/python2.7/site-packages/gevent/queue.py", line 261, in __get_or_peek
    result = waiter.get()
  File "/usr/local/python27/lib/python2.7/site-packages/gevent/hub.py", line 899, in get
    return self.hub.switch()
  File "/usr/local/python27/lib/python2.7/site-packages/gevent/hub.py", line 630, in switch
    return RawGreenlet.switch(self)
zerorpc.exceptions.LostRemote: Lost remote after 10s heartbeat

1.基本应用

服务端:

import zerorpc

class HelloRPC(object):
    def hello(self, name):
        return "Hello, %s" % name

s = zerorpc.Server(HelloRPC())
s.bind("tcp://0.0.0.0:4242")
s.run()

客户端:

import zerorpc

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
print c.hello("RPC")

结果:

>>> c.connect("tcp://127.0.0.1:4242")
[None]
>>> print c.hello("RPC")
Hello, RPC
>>> 

注:
服务端的s = zerorpc.Server(HelloRPC()),服务注册,只注册一个类,若注册注册多个,则运行时一般不会有异常,但客服端发送调用请求,执行时可能会抛出异常
例如:

服务端:

import zerorpc

class HelloRPC(object):
    def hello(self, name):
        return "Hello, %s" % name
class resRPC(object):
    def x(self, name):
        return "Hello, %s" % name

s = zerorpc.Server(HelloRPC(),resRPC())
#或注册两个
#s = zerorpc.Server(resRPC())
#s = zerorpc.Server(HelloRPC())
s.bind("tcp://0.0.0.0:4242")
s.run()

客户端:
import zerorpc

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
print c.hello("RPC")
print c.x("RPC")

当执行print c.x(“RPC”)会抛出类似如下异常: raise NameError(event.name)

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/core.py", line 278, in <lambda>
    return lambda *args, **kargs: self(method, *args, **kargs)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/core.py", line 270, in __call__
    return self._process_response(request_event, bufchan, timeout)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/core.py", line 238, in _process_response
    reply_event, self._handle_remote_error)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/patterns.py", line 45, in process_answer
    raise exception
zerorpc.exceptions.RemoteError: Traceback (most recent call last):
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/core.py", line 152, in _async_task
    raise NameError(event.name)
NameError: listinfo

2.流式响应

import zerorpc

class StreamingRPC(object):
    @zerorpc.stream
    def streaming_range(self, fr, to, step):
        return xrange(fr, to, step)

s = zerorpc.Server(StreamingRPC())
s.bind("tcp://0.0.0.0:4242")
s.run()

客户端:

import zerorpc

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")

for item in c.streaming_range(10, 20, 2):
    print item

结果:

>>> c.connect("tcp://127.0.0.1:4242")
[None]
>>> 
>>> for item in c.streaming_range(10, 20, 2):
...    print item
... 
10
12
14
16
18
>>>     

注:
@zerorpc.stream这里的函数修饰是必须的,否则会有异常,如TypeError: can’t serialize

import zerorpc

class StreamingRPC(object):
    def streaming_range(self, fr, to, step):
        return xrange(fr, to, step)

s = zerorpc.Server(StreamingRPC())
s.bind("tcp://0.0.0.0:4242")
s.run()

异常信息:

    raise exception
zerorpc.exceptions.RemoteError: Traceback (most recent call last):
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/core.py", line 153, in _async_task
    functor.pattern.process_call(self._context, bufchan, event, functor)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/patterns.py", line 34, in process_call
    channel.emit_event(rep_event)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/channel.py", line 234, in emit_event
    self._channel.emit_event(event)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/heartbeat.py", line 116, in emit_event
    self._channel.emit_event(event, timeout)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/channel.py", line 154, in emit_event
    self._multiplexer.emit_event(event, timeout)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/channel.py", line 66, in emit_event
    return self._events.emit_event(event, timeout)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/events.py", line 347, in emit_event
    parts.extend([b'', event.pack()])
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/events.py", line 208, in pack
    r = msgpack.Packer(use_bin_type=True).pack(payload)
  File "msgpack/_packer.pyx", line 231, in msgpack._packer.Packer.pack (msgpack/_packer.cpp:3661)
  File "msgpack/_packer.pyx", line 233, in msgpack._packer.Packer.pack (msgpack/_packer.cpp:3503)
  File "msgpack/_packer.pyx", line 221, in msgpack._packer.Packer._pack (msgpack/_packer.cpp:3230)
  File "msgpack/_packer.pyx", line 221, in msgpack._packer.Packer._pack (msgpack/_packer.cpp:3230)
  File "msgpack/_packer.pyx", line 228, in msgpack._packer.Packer._pack (msgpack/_packer.cpp:3382)
TypeError: can't serialize xrange(10, 20, 2)

3.传入多个参数

服务端:

import zerorpc

class myRPC(object):
    def listinfo(self,message):
        return "get info : %s"%message

    def getpow(self,n,m):
        return n**m           

s = zerorpc.Server(myRPC())
s.bind("tcp://0.0.0.0:4242")
s.run()

客户端:


import zerorpc

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
print c.listinfo("this is test string")
print c.getpow(2,5)

注:函数定义中,都需要定义一个默认self,否则将抛出TypeError异常:
TypeError: getpow() takes exactly 2 arguments (3 given)
例如:
将getpow改为如下:

    def getpow(n,m):
        return n**m     

异常信息:

    raise exception
zerorpc.exceptions.RemoteError: Traceback (most recent call last):
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/core.py", line 153, in _async_task
    functor.pattern.process_call(self._context, bufchan, event, functor)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/patterns.py", line 30, in process_call
    result = functor(*req_event.args)
  File "/usr/local/python27/lib/python2.7/site-packages/zerorpc/decorators.py", line 44, in __call__
    return self._functor(*args, **kargs)
TypeError: getpow() takes exactly 2 arguments (3 given)

4.获取时间
服务端

import zerorpc
import datetime

class GetMessage(object):
    def showget(self, str):
        return "get message : %s" %str
    def showtime(self):        
         now = datetime.datetime.now()
         return now.strftime('%Y-%m-%d %H:%M:%S')  

s = zerorpc.Server(GetMessage())
s.bind("tcp://0.0.0.0:8081")
s.run()

客服端:

import zerorpc

myclient = zerorpc.Client()
myclient.connect("tcp://192.168.137.19:8081")
print myclient.showget("RPC")
print myclient.showtime()

5.返回文件内容

服务端:

import zerorpc

class readfile(object):
    def readone(self,filepath):
        try:
          f = open(filepath)
          line = f.readline()
          return "%s file list one : %s" %(filepath,line)
        except:
          return "could not open filepath,file may not exists."  

    def readall(self,filepath):        
        try:
          f = open(filepath)
          lines = f.readlines()
          strfile=[]
          for line in lines:
              strfile.append(line)
          return "%s file list one : %s" %(filepath,strfile)
        except:
          return "could not open filepath,file may not exists."  

s = zerorpc.Server(readfile())
s.bind("tcp://0.0.0.0:8081")
s.run()

客户端:

import zerorpc

myclient = zerorpc.Client()
myclient.connect("tcp://192.168.137.19:8081")
print myclient.readone("ip.txt")
print myclient.readall()

结果:

>>> print myclient.readone("ip.txt")
ip.txt file list one : 192.168.1.1

>>> print myclient.readall("ip.txt")
ip.txt file list one : ['192.168.1.1\n', '192.168.1.2\n', '192.168.1.3\n', '192.168.1.4\n', '192.168.1.5\n', '192.168.1.6\n']
>>> 

另:
对于可变参数**kwargs,传入暂时无法成功获取结果,故暂不建议使用可变参数

这个可能是由于zerorpc内部的实现中定义多可变参数,所以无法传入传入用户定义
比如:self._functor(*args, **kargs)
例:

server:
import zerorpc

class myRPC(object):
    def func(self,**kwargs):
       mes=[]
       for key in kwargs:
           ks="%s: %s"%(key, kwargs[key])
           mes.append(ks)

       return mes     

s = zerorpc.Server(myRPC())
s.bind("tcp://0.0.0.0:4242")
s.run()



client:
import zerorpc

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
print c.func(a="abc")

返回不到想要的结果:

>>> c.connect("tcp://127.0.0.1:4242")
[None] 
>>> print c.func(a="abc")
[]
 类似资料: