当前位置: 首页 > 知识库问答 >
问题:

使用Spring集成对消息进行速率限制

叶文博
2023-03-14

我是Spring整合的新手。

我们有一个REST应用程序,它接收的消息太多(每分钟6000条消息),超出了数据库的处理能力。所以我想将请求的速率限制为每15秒500条消息(每分钟2000条)。我使用队列通道来实现这一点。

一段时间后,应用程序将创建30000个Java线程。此外,队列通道容纳的消息比队列容量中提到的要多。

如何减少线程数量并限制队列中的消息?

集成上下文xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-5.0.xsd">

    <!-- Endpoint -->   
    <int:gateway service-interface="com.ratelimiter.PrintGateway" default-request-channel="inputChannel">
        <int:method name="print"/>  
    </int:gateway>

    <!-- Channel -->
    <int:channel id="inputChannel">
        <int:queue capacity="30000"/>
    </int:channel>

    <!-- Endpoint -->   
    <int:service-activator ref="receiver" input-channel="inputChannel" method="save">
        <int:poller fixed-rate="15" time-unit="SECONDS" max-messages-per-poll="500"></int:poller>
    </int:service-activator>

    <!--  Spring Bean -->
    <bean id="receiver" class="com.ratelimiter.saveToDataStore"/>

</beans>

PrintGateway接口:

public interface PrintGateway {

    public Future<Message<String>> print(Message<?> message);
}

共有1个答案

哈宪
2023-03-14

因为你的网关签名是返回一个未来

默认情况下,它使用

private volatile AsyncTaskExecutor asyncExecutor = new SimpleAsyncTaskExecutor();

这真的为每一条新消息增加了一条新线索。重要的是:它会等待回复,以实现未来。根据你的代码,不会有任何回复,因此,你在网关中的线程会等待很长时间。

您应该考虑将网关的签名更改为 Value返回类型。这样你真的会发送并忘记。不会有任何免费的背景额外线程。

 类似资料:
  • 当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该

  • 我有一个集成应用程序,大部分工作,但注意到昨天一个消息丢失了。当时,service-activatorendpoint正忙于处理先前的消息。 以下是适用于该问题的配置。

  • 在Spring集成中使用出站网关时,我试图在JMS标头中发送回复Q详细信息。我了解到JIRA#INT-97中的增强功能在将Spring消息标头发送到JMS目标之前将其复制到JMS标头。 在将消息发送到出站网关之前,将消息头设置如下。message.getHeader(). setAtcm(JmsTargetAdapter.JMS_REPLY_TO, myReplyDestation); 但是我无法

  • 我正在开发一个简单的REST应用程序,它利用RxJava向远程服务器发送请求(1)。对于REST API的每个传入请求,都会向(1)发送一个请求(使用RxJava和RxNetty)。一切正常,但现在我有了一个新的用例: 为了不让太多的请求轰炸(1),我需要实施速率限制。解决这个问题的一种方法(我假设)是将在向(1)发送请求时创建的每个可观察的(2)添加到另一个执行实际速率限制的(2)中。(2) 然

  • 我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方

  • 我有一项服务,使用Firebase Cloud Messaging与Android客户端进行通信,使用带有参数设置的FCM数据消息。从有关可折叠密钥的文档中: 当有新消息呈现旧线程时,相关消息与客户端应用程序无关,FCM将替换旧消息。例如,发送到同步或过期的通知消息。 这就是我要找的。我不需要所有的更新,只需要最后一个。但是,如果用户在线,我需要它尽快。 然而,我得到了一个奇怪的速率限制,它不会导