当前位置: 首页 > 知识库问答 >
问题:

使用Camel RoutePolicy设置sqs的高优先级/低优先级路由使用者的优雅方法?

严峰
2023-03-14

我有两个SQS队列:一个用于低优先级,另一个用于高优先级消息。逻辑是不要接触低优先级队列上的消息,除非高优先级队列为空。

SuspendLowPriorityRoutePolicy suspendLowPriorityRoutePolicy = new SuspendLowPriorityRoutePolicy(LOW_PRIORITY_ROUTE_ID, camelContext);

        from(UriBuilder.buildSqsUri(sqsProperties)).routeId(LOW_PRIORITY_ROUTE_ID)
                .log(LoggingLevel.INFO, log, "NON-PRIORITY: ${body}");

        from(UriBuilder.buildPrioritySqsUri(sqsProperties)).routeId(HIGH_PRIORITY_ROUTE_ID)
                .routePolicy(suspendLowPriorityRoutePolicy)
                .log(LoggingLevel.INFO, log, "PRIORITY: ${body}");

现在,我将这两条路由设置为同时使用队列中的消息。我想要的是,一个消息进入高优先级路由触发低优先级路由的停止。为了尝试获得此功能,我尝试使用一种路由策略,当在高优先级路由上启动新交换时,该策略将停止低优先级队列:

(来自SuspendLowPriorityRoutePolicy的片段)

 @Override
    public void onExchangeBegin(Route route, Exchange exchange) {
        Route lowPriorityRoute = context.getRoute(lowPriorityRouteId);
        ServiceStatus routeStatus = context.getRouteStatus(lowPriorityRouteId);
        if (!routeStatus.isStopped()) {
            try {
                lock.lock();
                log.info("High priority request came in, stopping consumer");
                stopConsumer(lowPriorityRoute.getConsumer());
            } catch (Exception e) {
                log.error("Exception stopping consumer " + e);
                handleException(e);
            } finally {
                lock.unlock();
            }
        }
    }

然而,我不确定如何重新启动低优先级的消费者。骆驼routepolicy提供的其他钩子允许重写onExchangeDone,但此时逻辑应该是只有当高优先级队列为空时才重新启动低优先级使用者。我认为没有一种方法可以检查队列是否为空,我们可以检查exchange done钩子上的ExampiateNumberOfMessages属性,但这可能是不准确的。

另一种想法是有一个调度的后台轮询器,该轮询器检查CamelContext的inFlightRequestsRepository,并且只有在高优先级路由没有飞行请求时才重新启动低优先级队列。

共有1个答案

柴坚诚
2023-03-14

好吧,我能想到的最简单的方法是,用比低优先级消息多得多的消费者来消费高优先级消息。这样,高优先级的消息通常会很快被使用,而低优先级的消息可能会堆积,因此必须等待。

然而,低优先级消费者仍然会处理消息,无论同时处理的高优先级消息有多少。

更接近您的用例的一点是使用JMS优先级。

不过,有一些事情要记住:

  • 使用消息优先级根据需要重新排序队列中的消息。这会降低包含大量消息的队列的性能。
  • 消息优先级可能必须在代理或队列上激活。否则,轻重缓急就会被忽略。与您的代理一起搜索优先级示例以检查此内容。
  • 我不确定,但我认为JMS连接上也有一些标志来尊重使用者的优先级
  • 您必须保持使用者预取为低电平。如果您的使用者预取了1000条低优先级的消息,它会在执行其他操作之前处理所有这些消息。无论同时是否有高优先级消息到达队列。预取越大,在处理新到达的高prio消息之前,可以处理更多的低prio消息。
 类似资料:
  • 我需要一个优先级队列,它首先获得具有最高优先级值的项目。我当前正在使用队列库中的PriorityQueue类。但是,这个函数只先返回值最小的项。我尝试了一些很难看的解决方案,比如(sys.maxint-priority)作为优先级,但我只是想知道是否存在更优雅的解决方案。

  • 我有两个拥有相同消费者的AMQ队列。第一个队列(Q1)处理97%的消息,另一个队列(Q2)仅处理3%。问题是,Q2中的消息需要在消息排队后立即进行处理。所以我的问题是,当一条消息在第二季度可用时,我需要以某种方式暂停第一条路线,以吸引消费者。apache camel路由如下所示: 应该使用什么策略?我不认为我可以使用重新定序器,因为Q1可能有成千上万的消息排队,我不能把所有的消息都放在重新定序器批

  • 在我的python应用程序中,我使用芹菜作为任务生产者和消费者,使用RabbitMQ作为代理。现在,我正在实施优先级排序。起初,它看起来根本不起作用,因为根据文档,我刚刚在队列中添加了参数。我更深入地研究了一下,发现了另一种优先级——消费者优先级和任务优先级。所以,现在,看起来有三种不同的优先顺序,我完全困惑了。你能给我解释一下区别吗? 队列最大优先级:即https://www.rabbitmq.

  • 问题内容: 我的网页包含: 引用的样式表包含: 我在ID中有一张表格,希望单元格有一些填充。但是,引用的样式表优先于内联样式。我可以通过Firebug直观地看到这一点。如果我关闭Firebug中的指令,则向左填充将生效。 我该如何上班? 问题答案: 正如其他人提到的那样,您有一个特异性问题。当确定两个规则中的哪一个优先时,CSS引擎会计算每个选择器中的s 数量。如果一个比另一个多,就使用它。否则,

  • 代码生成器用于将Blockly的程序转换为JavaScript,Python,PHP,Lua,Dart等。在为新块编写代码生成器时,最具挑战性的问题是处理操作顺序,以使生成的代码按预期执行。 圆括号 考虑下面的块组装。 如果生成器不知道运算符的优先级,则生成的JavaScript代码将是: alert(2*3+4); 这显然是不正确的,因为乘法运算符会撕裂加法,自己获取“ 3”。一种解决方案是将每

  • 事件的吞没中,我们提到了事件的传递。事件如何传递,先到哪个监听器?这是由优先级决定的。 固定值优先级 使用一个整形的数值,数值较低的监听器比数值较高的监听器,先接收到事件。 场景图优先级 是指向节点对象的指针,z-order 较高的节点中的监听器比 z-order 较低的节点中的,先接收到事件。由于 z-order 较高的节点在顶部绘制,所以使用这种优先级可以确保触摸事件被正确响应 还记得这个场景