当前位置: 首页 > 知识库问答 >
问题:

如何创建既适合发送又适合消费的ZeroMQ套接字?

淳于鹏
2023-03-14

请为以下场景提供ZeroMQ套接字体系结构建议:

1)端口上有服务器监听

2)有多个客户端同时连接服务器

3) 服务器接受来自客户端的所有连接,并为每个客户端提供双向队列,这意味着双方(客户端N或服务器)都可以发送或使用消息,即双方都可以是通信的发起方,另一方应该有一个回调来处理消息。

我们应该在每个接受的连接上创建额外的ZeroMQ套接字来从服务器推送消息吗?你能建议哪一个ZeroMQ套接字类型谷歌这样的架构?

共有2个答案

漆雕深
2023-03-14

我遇到了基本相同的问题。对我来说,问题似乎是即使在有GIL保护的python中,你也不能一次从两个线程操作一个套接字。这将导致解释器崩溃。因此,我们不能有发送和接收线程,而是一个线程,它必须能够同时接收和发送,而pyzmq无法同时轮询()套接字和非套接字。

用户3666197也回答了我的问题,但是两个回答都没有帮助。尤其是因为不能假设为所有客户端连接生成主机套接字需要1000个空闲和开放端口。

有一个相对丑陋的解决方案。这是一个缺乏适当的NOWAIT标志等的草稿。make_process_pull_socket()为进程内部的通信创建了一个套接字。如下:

class TwoWay(threading.Thread):
def __init__(self, ip_string, port, inque:queue.Queue):
    super(TwoWay, self).__init__()
    self.sock = cntxt.socket(zmq.ROUTER)
    addr = "tcp://{}:{}".format(ip_string, str(port))
    self.sock.bind(addr)
    self.pull = make_process_pull_socket(4456)
    self.push = make_process_push_socket(4456)
    self.inque = inque

def run(self):
    pl = zmq.Poller()
    pl.register(self.sock,zmq.POLLIN)
    pl.register(self.pull,zmq.POLLIN)
    while True:
        try:
            p = pl.poll(timeout=2000)
            if not p:
                continue
            s,i = p[0]
            if s == self.sock:
                a,m = s.recv_multipart()
                self.inque.put((a,m))
            if s == self.pull:
                r = s.recv()
                if len(r) > 5:
                    self.sock.send_multipart([r[:5], r[5:]])
        except Exception as e:
            print(e)

然后将传入消息放入作为参数提供的inque队列中,传出消息(带有标识符)可以双向发送。推发送(id消息)

齐航
2023-03-14

在每个接受的连接上创建额外的ZeroMQ套接字,用于从服务器推送消息?

最简单的基于组合的设计——可扩展性和安全性

本机ZeroMQ原语(智能原语可扩展的正式通信模式原型)对我们来说就像一个乐高积木——我们在应用领域中的预期目标用途的消息/信令平面上组合它们的进一步用途。

问:对于这种架构,你能告诉谷歌哪种ZeroMQ套接字类型吗?

没有,因为没有提供此类建议的详细需求清单。一对PUSH/PULL-s本身不需要满足要求,临时执行的(偶发的)REQ/REP可能有助于客户机(重新)发现阶段,就像其他共存的、持久的或偶发的原型构成任何额外的系统/服务平面一样。

 类似资料:
  • 我正在尝试创建一个DateTimeFormatter对象,其模式符合以下时间表达式:2016-07-22T00:00:00.000-05:00。我正在尝试使用带有上述输入字符串的DateTimeFormatter类创建DateTime对象。 我已经尝试了下面表达式的许多不同版本,但目前被困在时区片"-05:00",在那里我得到了我的jUnit测试用例的错误: 我使用的当前格式模式是: 我也尝试过:

  • 我正在参与一个项目,该项目将首先构建一个简单的消息系统,该系统将接收消息,存储消息并将其路由到适当的部门。基本用例是: < li >用户在网站表单中写下一条消息或一个问题,并选择一个部门将消息发送给该部门 < li >根据用户的选择,消息被发送到相应部门的消息队列,状态为“未读”、“已读”等。(我们还没有确定所有的状态)。 < li >这些信息成为用户与网站互动的一部分,即,如果用户拨打客服电话,

  • 本文向大家介绍创建topic时如何选择合适的分区数?相关面试题,主要包含被问及创建topic时如何选择合适的分区数?时的应答技巧和注意事项,需要的朋友参考一下 根据集群的机器数量和需要的吞吐量来决定适合的分区数

  • 我有一个嵌套的布局,如下所示: 我现在遇到的问题是,由于我所有的数据项都在子1或子2中,如果我添加或删除一个项,子Linearlayout将以animateLayoutChanges的效果设置动画,但父布局将不做任何动画。(对于所有线性布局,我将设置为)。尤其是当我删除子1中的一个项目时,动画效果会变得奇怪(基本上,当子1仍在制作动画时,子2会跳起来)。 有人知道怎么解决这个问题吗? 谢谢 更新

  • 客户点击导出excel,这时发送一条消息到 rocketmq, rocketmq在消费时,会有超时重试机制,默认15S,重试(不管有没有错误,到时间就认为超时,虽然可以修改) 生成个excel的时间可能在3~5分钟,可能生成时发生错误(如读取数据超时,有一定容错,但无法避免完全不出问题). 这种情况是不是太适合用rocketmq来做触发和重试? 用数据库做轮询是不是比较好? 或是更好的实现方式?

  • 我尝试为我的数据创建带有“转换”的插件到kafka-connect,并将其与不同的接收器连接器一起使用。当我安装插件时,kafka-connect看不到我的类。 我使用kafka connect maven插件创建了我的捆绑包zip。使用confluent hub(来自本地文件)的安装已成功。 所有文件都已解压,我的工作者属性已更新插件。路径。我在分布式模式下运行connect,并尝试从包中创建带