我刚刚开始使用ZeroMQ,我正在尝试让Hello World与Python 3.6中的PyZMQ和asyncio一起工作。我正在尝试将模块的功能与发布/子代码分离,因此,下面的类设置如下:
编辑1:最小化示例
编辑2:包含的解决方案,请参见答案。
import asyncio
import zmq.asyncio
from zmq.asyncio import Context
# manages message flow between publishers and subscribers
class HelloWorldMessage:
def __init__(self, url='127.0.0.1', port='5555'):
self.url = "tcp://{}:{}".format(url, port)
self.ctx = Context.instance()
# activate publishers / subscribers
asyncio.get_event_loop().run_until_complete(asyncio.wait([
self.pub_hello_world(),
self.sub_hello_world(),
]))
# generates message "Hello World" and publish to topic 'world'
async def pub_hello_world(self):
pub = self.ctx.socket(zmq.PUB)
pub.connect(self.url)
# message contents
msg = "Hello World"
print(msg)
# keep sending messages
while True:
# --MOVED-- slow down message publication
await asyncio.sleep(1)
# publish message to topic 'world'
# async always needs `send_multipart()`
await pub.send_multipart([b'world', msg.encode('ascii')]) # WRONG: bytes(msg)
# processes message "Hello World" from topic 'world'
async def sub_hello_world(self):
sub = self.ctx.socket(zmq.SUB)
sub.bind(self.url)
sub.setsockopt(zmq.SUBSCRIBE, b'world')
# keep listening to all published message on topic 'world'
while True:
msg = await sub.recv_multipart()
# ERROR: WAITS FOREVER
print('received: ', msg)
if __name__ == '__main__':
HelloWorldMessage()
使用上面的代码,只打印1<code>Hello World</code>,然后永远等待。如果我按ctrl c,我会得到以下错误:
python helloworld_pubsub.py
Hello World
^CTraceback (most recent call last):
File "helloworld_pubsub_stackoverflow.py", line 64, in <module>
HelloWorldMessage()
File "helloworld_pubsub_stackoverflow.py", line 27, in __init__
self.sub_hello_world(),
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
self._run_once()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
event_list = self._selector.select(timeout)
File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
版本:libzmq: 4.2.3
,pyzmq: 17.0.0
,Ubuntu 16.04
任何真知灼见,不胜感激。
1)众所周知,ZeroMQ PUB/SUB
可伸缩正式通信原型在< code>PUB/SUB真正准备好广播/接受消息之前需要一些时间。因此,最好在< code >中设置基础设施。__init__()和not right before < code > SUB -s应该已经接收了一些有效负载
在我看来,这将是一种更安全的设计方法:
class HelloWorldMessage:
""" __doc__
[DEF-ME]
[DOC-ME]
USAGE: with HelloWorldMessage() as aContextManagerFUSEd_class_INSTANCE:
# may use aContextManagerFUSEd_class_INSTANCE
# and shall safely
# gracefully terminate locally spawned ZeroMQ resources
PARAMETERS:
RETURNS:
THROWS:
EXAMPLE:
REF.s:
[TEST-ME]
[PERF-ME]
[PUB-ME]
"""
def __init__( self, url = '127.0.0.1',
port = '5555'
):
self._url = "tcp://{}:{}".format( url, port )
#---------------------------------------------------- CONTEXT:
self._ctx = Context.instance(); print( "INF: zmq.asyncio.Context() set" if ( zmq.ZMQError() == 0 ) else "ERR[1]: {0:}".format( zmq.ZMQError() ) )
#---------------------------------------------------- SUB:
self._sub = self._ctx.socket(zmq.SUB ); print( "INF: zmq.SUB set" if ( zmq.ZMQError() == 0 ) else "ERR[2]: {0:}".format( zmq.ZMQError() ) )
self._sub.bind( self._url ); print( "INF: zmq.SUB.bind() done" if ( zmq.ZMQError() == 0 ) else "ERR[3]: {0:}".format( zmq.ZMQError() ) )
self._sub.setsockopt( zmq.LINGER, 1 ); print( "INF: zmq.SUB LINGER set" if ( zmq.ZMQError() == 0 ) else "ERR[4]: {0:}".format( zmq.ZMQError() ) )
self._sub.setsockopt( zmq.SUBSCRIBE, b'world');print( "INF: zmq.SUB subscribed" if ( zmq.ZMQError() == 0 ) else "ERR[5]: {0:}".format( zmq.ZMQError() ) )
#---------------------------------------------------- PUB:
self._pub = self._ctx.socket(zmq.PUB ); print( "INF: zmq.PUB set" if ( zmq.ZMQError() == 0 ) else "ERR[6]: {0:}".format( zmq.ZMQError() ) )
self._pub.setsockopt( zmq.LINGER, 1 ); print( "INF: zmq.PUB LINGER set" if ( zmq.ZMQError() == 0 ) else "ERR[7]: {0:}".format( zmq.ZMQError() ) )
self._pub.connect( self._url ); print( "INF: zmq.PUB.connect() done" if ( zmq.ZMQError() == 0 ) else "ERR[8]: {0:}".format( zmq.ZMQError() ) )
#----------------------------------------------------
...
def __enter__( self ):
#---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __enter__()-auto-METHOD
return self
def __exit__( self, exc_type, exc_value, traceback ):
#---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __exit__()-auto-METHOD
self.try_to_close( self._pub );
self.try_to_close( self._sub );
pass; self._ctx.term()
return
################################################################
#
# A PUB-SENDER ------------------------------------
async def pub_hello_world( self ):
self._pObj = PubHelloWorld(); print( "INF: pObj set on PUB-side" if ( self._pObj.msg_pub() # instance-fuse(d)
== "Hello World" ) else "ERR[9]: {0:}".format( "Hello World" ) )
try:
while True: # keep sending messages
self._sMsg = self._pObj.msg_pub(); print( "INF: pObj.msg_pub() called" if ( self._sMsg != None ) else "ERR[A]: {0:}".format( "msg == ?" ) )
pass; print( self._sMsg )
# publish message to topic 'world'
# async always needs `send_multipart()`
await self._pub.send_multipart( [ b'world',
bytes( self._sMsg )
]
); print( "INF: await .send_multipart()" if ( zmq.ZMQError() == 0 ) else "ERR[B]: {0:}".format( zmq.ZMQError() ) )
# slow down message publication
await asyncio.sleep( 1 ); print( "NOP: await .sleep( 1 )" if ( zmq.ZMQError() == 0 ) else "ERR[C]: {0:}".format( zmq.ZMQError() ) )
except:
pass; print( "EXC: thrown on PUB side" if ( zmq.ZMQError() == 0 ) else "ERR[D]: {0:}".format( zmq.ZMQError() ) )
finally:
self._pub.close(); print( "FIN: PUB.close()-d" if ( zmq.ZMQError() == 0 ) else "ERR[E]: {0:}".format( zmq.ZMQError() ) )
################################################################
#
# A SUB-RECEIVER ---------------------------------
async def sub_hello_world( self ):
self._sObj = SubHelloWorld(); print( "INF: sObj set on SUB-side" if ( None # instance-fuse(d)
== self._sObj.msg_receive("?")
) else "ERR[F]: {0:}".format( "?" ) )
try:
while True: # keep listening to all published message on topic 'world'
pass; print( "INF: await .recv_multipart() about to be called now:" )
self._rMsg = await self._sub.recv_multipart()
pass; print( "INF: await .recv_multipart()" if ( zmq.ZMQError() == 0 ) else "ERR[G]: {0:}".format( zmq.ZMQError() ) )
pass; print( 'ACK: received: ', self._rMsg )
self._sObj.msg_receive( self._rMsg ); print( 'ACK: .msg_receive()-printed.' )
except:
pass; print( "EXC: thrown on SUB side" if ( zmq.ZMQError() == 0 ) else "ERR[H]: {0:}".format( zmq.ZMQError() ) )
finally:
self._sub.close(); print( "FIN: SUB.close()-d" if ( zmq.ZMQError() == 0 ) else "ERR[I]: {0:}".format( zmq.ZMQError() ) )
# ---------close()---------------------------------------
def try_to_close( self, aSocketINSTANCE ):
try:
aSocketINSTANCE.close();
except:
pass;
return
2)最好使用和Helloworld Message()作为…:
上下文管理器
我的代码有2个错误:
PUB/SUB
通信原型需要一些时间进行初始化(参见他/她的回答)。我不得不将等待asyncio.sleep(1)
移到发布代码之上(等待pub.send_multipart([b'world',msg.encode('ascii')])
)bytes(msg)
--这个答案与我的问题最密切相关,但请查看@user3666197,了解实现PyZMQ时的某些设计选择。
似乎PyZMQ在<code>异步中。get_event_loop()不会提供错误回溯,因此,请将代码包装在try
中
import traceback
import logging
try:
while True:
msg_received = await sub.recv_multipart()
# do other stuff
except Exception as e:
print("Error with sub world")
logging.error(traceback.format_exc())