我们为分布式进程之间的消息传输实现了ZMQ PUB/SUB机制。但是由于订阅者的流转时长,消息的处理会有一定的延迟(有时由于排队消息的数量,延迟以小时为单位)。为了克服这种延迟,我计划根据进入发布者队列的未决消息的数量来扩展订阅者进程。
是否有任何机制来获取 ZMQ 发布者队列的计数/长度?
目前,我正在考虑使用发布服务器 RAM 利用率阈值来纵向扩展/缩减订阅服务器进程。
Q : 如何在 ZMQ 发布者/订阅者队列中获取消息/事件计数?
PUB
端有两个选项:
1)所有.send()
-调用的简单计数避免了程序设计中的“垃圾错误”
2)如果不使用显式计数,可能会解决“垃圾”整理,使用socket_monitor
工具(为此需要额外的成本)
SUB
-side是消息的被动接收者,因此它只有一个选项:
-使用socket_monitor
工具创建您自己的、特定于应用程序的检测器(分析所有幕后API事件,没有其他方法可以在已发布的API抽象幕后作弊)
问:是否有任何机制可以获取 ZMQ 发布服务器队列的计数/长度?
(见上文)取决于观察员“机制”的立场:
添加另一个独立工作的信令控制平面将有助于这些原则的交叉,从而增强双方的所有相关控制数据,以获得排队业务的全貌。
我正在创建一个API来发布和使用来自RabbitMQ的消息。在我当前的设计中,发布者将发出一个POST请求。我的API会将POST请求路由到Exchange。这样,发布者在发布时就不必知道服务器地址、exchange名称等。 现在消费者部分是我不确定如何继续的地方。 开始时不会排队。当一个新的消费者想要订阅一个主题时,那么我将创建一个队列并将其绑定到交换。我需要一些问题的答案- 一旦我为使用者创建
我需要在发布/订阅模式下调用Kafka消费者1000次。据我所知,为了让kafka在发布/订阅模式下工作,我需要给每个消费者一个新的groupId(props . put(" group . id ",String.valueOf(Instant.now())。toEpochMilli()));).但是当我这样做的时候,如果两个消费线程同时访问消费线程,就会出现问题。这个问题应该怎么解决?
我通读了RxJS文档,并希望确保我理解了< code > subscriber . unsubscribe()和< code > subscriber . complete()之间的区别。 假设我有一个有两个订阅者的可观察对象,订阅者1和订阅者2。如果订阅者1对其订阅调用取消订阅,它将不再接收来自可观察对象的通知,但订阅者2将继续接收它们。 <代码>的文档。complete(): 观察者回调,用于
我想用Java实现各种各样的发布者/订阅者模式,但目前已经没有主意了。 有1个发布者和N个订阅者,发布者发布对象,然后每个订阅者需要按照正确的顺序对每个对象进行一次且仅处理一次。发布者和每个订阅者在自己的线程中运行。 在我最初的实现中,每个订阅者都有自己的阻塞队列,发布者将对象放入每个订阅者的队列中。这可以正常工作,但如果任何订阅者的队列已满,发布者将被阻塞。这会导致性能下降,因为每个订阅者处理对
目前,我已经开始使用ActiveMQ处理JMS主题。我已经通过JAVA代码(如下所述)创建了发布者和持久订阅者,并且在订阅者端也收到了消息。 Publisher.Java 订阅者.java 我对以下主题有一些疑问, 如何检查有多少订阅者使用 Java JMS 在主题中主动查找消息? 如何从主题中获取活动和持久订阅者列表? 我们是否可以删除主题中发布的消息? 在这些情况下帮助我。 提前致谢。
我是新的数据流和发布子工具在GCP。 需要将prem过程中的电流迁移到GCP。 当前流程如下: 我们有两种类型的数据馈送 Full Feed–其adhoc作业–完整XML的大小约为100GB(单个XML–非常复杂的一个–完整的数据–ETL作业处理此XML并将其加载到约60个表中) 单独的ETL作业用于处理完整提要。ETL作业过程完全馈送并创建负载就绪文件,所有表将被截断并重新加载 源系统每30分钟