当前位置: 首页 > 知识库问答 >
问题:

Python 3.6 ZeroMQ(PyZMQ)asyncio pub sub Hello World

卢毅
2023-03-14

我刚刚开始使用ZeroMQ,我正在尝试让Hello World与Python 3.6中的PyZMQ和asyncio一起工作。我正在尝试将模块的功能与发布/子代码分离,因此,下面的类设置如下:

编辑1:最小化示例

编辑2:包含的解决方案,请参见答案。

import asyncio
import zmq.asyncio
from zmq.asyncio import Context

# manages message flow between publishers and subscribers
class HelloWorldMessage:
    def __init__(self, url='127.0.0.1', port='5555'):
        self.url = "tcp://{}:{}".format(url, port)
        self.ctx = Context.instance()

        # activate publishers / subscribers
        asyncio.get_event_loop().run_until_complete(asyncio.wait([
            self.pub_hello_world(),
            self.sub_hello_world(),
        ]))

    # generates message "Hello World" and publish to topic 'world'
    async def pub_hello_world(self):
        pub = self.ctx.socket(zmq.PUB)
        pub.connect(self.url)

        # message contents
        msg = "Hello World"
        print(msg)

        # keep sending messages
        while True:
            # --MOVED-- slow down message publication
            await asyncio.sleep(1) 

            # publish message to topic 'world'
            # async always needs `send_multipart()`
            await pub.send_multipart([b'world', msg.encode('ascii')])  # WRONG: bytes(msg)

    # processes message "Hello World" from topic 'world'
    async def sub_hello_world(self):
        sub = self.ctx.socket(zmq.SUB)
        sub.bind(self.url)
        sub.setsockopt(zmq.SUBSCRIBE, b'world')

        # keep listening to all published message on topic 'world'
        while True:
            msg = await sub.recv_multipart()
            # ERROR: WAITS FOREVER
            print('received: ', msg)

if __name__ == '__main__':
    HelloWorldMessage()

使用上面的代码,只打印1<code>Hello World</code>,然后永远等待。如果我按ctrl c,我会得到以下错误:

python helloworld_pubsub.py

Hello World
^CTraceback (most recent call last):
  File "helloworld_pubsub_stackoverflow.py", line 64, in <module>
    HelloWorldMessage()
  File "helloworld_pubsub_stackoverflow.py", line 27, in __init__
    self.sub_hello_world(),
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
    event_list = self._selector.select(timeout)
  File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt

版本:libzmq: 4.2.3pyzmq: 17.0.0Ubuntu 16.04

任何真知灼见,不胜感激。

共有2个答案

屠坚壁
2023-03-14

1)众所周知,ZeroMQ PUB/SUB可伸缩正式通信原型在< code>PUB/SUB真正准备好广播/接受消息之前需要一些时间。因此,最好在< code >中设置基础设施。__init__()和not right before < code > SUB -s应该已经接收了一些有效负载

在我看来,这将是一种更安全的设计方法:

class HelloWorldMessage:
    """                                                       __doc__
    [DEF-ME]
    [DOC-ME]

    USAGE:     with HelloWorldMessage() as aContextManagerFUSEd_class_INSTANCE:
                    # may              use aContextManagerFUSEd_class_INSTANCE
                    # and shall safely
                    #     gracefully terminate locally spawned ZeroMQ resources
    PARAMETERS:
    RETURNS:   
    THROWS:    
    EXAMPLE:   
    REF.s:

    [TEST-ME]
    [PERF-ME]
    [PUB-ME]
    """
    def __init__( self, url  = '127.0.0.1',
                        port = '5555'
                        ):

        self._url = "tcp://{}:{}".format( url, port )
        #---------------------------------------------------- CONTEXT:
        self._ctx = Context.instance();                       print( "INF: zmq.asyncio.Context() set" if ( zmq.ZMQError() == 0 ) else "ERR[1]: {0:}".format( zmq.ZMQError() ) )
        #---------------------------------------------------- SUB:
        self._sub = self._ctx.socket(zmq.SUB );               print( "INF: zmq.SUB set"               if ( zmq.ZMQError() == 0 ) else "ERR[2]: {0:}".format( zmq.ZMQError() ) )
        self._sub.bind(                  self._url );         print( "INF: zmq.SUB.bind() done"       if ( zmq.ZMQError() == 0 ) else "ERR[3]: {0:}".format( zmq.ZMQError() ) )
        self._sub.setsockopt(        zmq.LINGER, 1 );         print( "INF: zmq.SUB LINGER set"        if ( zmq.ZMQError() == 0 ) else "ERR[4]: {0:}".format( zmq.ZMQError() ) )
        self._sub.setsockopt(        zmq.SUBSCRIBE, b'world');print( "INF: zmq.SUB subscribed"        if ( zmq.ZMQError() == 0 ) else "ERR[5]: {0:}".format( zmq.ZMQError() ) )
        #---------------------------------------------------- PUB:
        self._pub = self._ctx.socket(zmq.PUB );               print( "INF: zmq.PUB set"               if ( zmq.ZMQError() == 0 ) else "ERR[6]: {0:}".format( zmq.ZMQError() ) )
        self._pub.setsockopt(        zmq.LINGER, 1 );         print( "INF: zmq.PUB LINGER set"        if ( zmq.ZMQError() == 0 ) else "ERR[7]: {0:}".format( zmq.ZMQError() ) )
        self._pub.connect(               self._url );         print( "INF: zmq.PUB.connect() done"    if ( zmq.ZMQError() == 0 ) else "ERR[8]: {0:}".format( zmq.ZMQError() ) )
        #----------------------------------------------------
        ...
    def __enter__( self ):
        #---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __enter__()-auto-METHOD
        return self

    def __exit__( self, exc_type, exc_value, traceback ):
        #---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __exit__()-auto-METHOD
        self.try_to_close( self._pub );
        self.try_to_close( self._sub );
        pass;         self._ctx.term()
        return

    ################################################################
    #
    # A       PUB-SENDER ------------------------------------
    async def pub_hello_world( self ):

          self._pObj = PubHelloWorld();                       print( "INF: pObj set on PUB-side"      if ( self._pObj.msg_pub()  # instance-fuse(d)
                                                                                                         ==   "Hello World"    ) else "ERR[9]: {0:}".format( "Hello World" ) )
          try:
               while True:                                    # keep sending messages
                   self._sMsg = self._pObj.msg_pub();         print( "INF: pObj.msg_pub() called"     if ( self._sMsg  != None ) else "ERR[A]: {0:}".format( "msg == ?"    ) )
                   pass;                                      print( self._sMsg )
                   # publish message to topic 'world'
                   # async always needs `send_multipart()`
                   await self._pub.send_multipart( [ b'world',
                                                       bytes( self._sMsg )
                                                       ]
                                                  );          print( "INF: await .send_multipart()"   if ( zmq.ZMQError() == 0 ) else "ERR[B]: {0:}".format( zmq.ZMQError() ) )
                   # slow down message publication
                   await asyncio.sleep( 1 );                  print( "NOP: await .sleep( 1 )"         if ( zmq.ZMQError() == 0 ) else "ERR[C]: {0:}".format( zmq.ZMQError() ) )
          except:
              pass;                                           print( "EXC: thrown on PUB side"        if ( zmq.ZMQError() == 0 ) else "ERR[D]: {0:}".format( zmq.ZMQError() ) )

          finally:
              self._pub.close();                              print( "FIN: PUB.close()-d"             if ( zmq.ZMQError() == 0 ) else "ERR[E]: {0:}".format( zmq.ZMQError() ) )

    ################################################################
    #
    # A       SUB-RECEIVER ---------------------------------
    async def sub_hello_world( self ):

          self._sObj = SubHelloWorld();                       print( "INF: sObj set on SUB-side"      if (  None                 # instance-fuse(d)
                                                                                                         == self._sObj.msg_receive("?")
                                                                                                            )                    else "ERR[F]: {0:}".format( "?"            ) )
          try:
               while True:                                   # keep listening to all published message on topic 'world'
                     pass;                                    print( "INF: await .recv_multipart() about to be called now:" )
                     self._rMsg = await self._sub.recv_multipart()
                     pass;                                    print( "INF: await .recv_multipart()"   if ( zmq.ZMQError() == 0 ) else "ERR[G]: {0:}".format( zmq.ZMQError() ) )
                     pass;                                    print( 'ACK: received: ', self._rMsg )
                     self._sObj.msg_receive( self._rMsg );    print( 'ACK: .msg_receive()-printed.' )
          except:
              pass;                                           print( "EXC: thrown on SUB side"        if ( zmq.ZMQError() == 0 ) else "ERR[H]: {0:}".format( zmq.ZMQError() ) )

          finally:
              self._sub.close();                              print( "FIN: SUB.close()-d"             if ( zmq.ZMQError() == 0 ) else "ERR[I]: {0:}".format( zmq.ZMQError() ) )

    # ---------close()---------------------------------------
    def try_to_close( self, aSocketINSTANCE ):

        try:
            aSocketINSTANCE.close();

        except:
            pass;

        return

2)最好使用和Helloworld Message()作为…:上下文管理器

闻人举
2023-03-14

我的代码有2个错误:

  1. 正如@user3666197所提到的,PUB/SUB通信原型需要一些时间进行初始化(参见他/她的回答)。我不得不将等待asyncio.sleep(1)移到发布代码之上(等待pub.send_multipart([b'world',msg.encode('ascii')])
  2. 我把消息编码错了。bytes(msg)--

这个答案与我的问题最密切相关,但请查看@user3666197,了解实现PyZMQ时的某些设计选择。

似乎PyZMQ在<code>异步中。get_event_loop()不会提供错误回溯,因此,请将代码包装在try

import traceback
import logging

try:
    while True:
        msg_received = await sub.recv_multipart()
        # do other stuff

except Exception as e:
    print("Error with sub world")
    logging.error(traceback.format_exc())
 类似资料:

相关问答

相关文章

相关阅读