我是Spring整合的新手。
我们有一个REST应用程序,它接收的消息太多(每分钟6000条消息),超出了数据库的处理能力。所以我想将请求的速率限制为每15秒500条消息(每分钟2000条)。我使用队列通道来实现这一点。
一段时间后,应用程序将创建30000个Java线程。此外,队列通道容纳的消息比队列容量中提到的要多。
如何减少线程数量并限制队列中的消息?
集成上下文xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-5.0.xsd">
<!-- Endpoint -->
<int:gateway service-interface="com.ratelimiter.PrintGateway" default-request-channel="inputChannel">
<int:method name="print"/>
</int:gateway>
<!-- Channel -->
<int:channel id="inputChannel">
<int:queue capacity="30000"/>
</int:channel>
<!-- Endpoint -->
<int:service-activator ref="receiver" input-channel="inputChannel" method="save">
<int:poller fixed-rate="15" time-unit="SECONDS" max-messages-per-poll="500"></int:poller>
</int:service-activator>
<!-- Spring Bean -->
<bean id="receiver" class="com.ratelimiter.saveToDataStore"/>
</beans>
PrintGateway接口:
public interface PrintGateway {
public Future<Message<String>> print(Message<?> message);
}
因为你的网关签名是返回一个未来
默认情况下,它使用
private volatile AsyncTaskExecutor asyncExecutor = new SimpleAsyncTaskExecutor();
这真的为每一条新消息增加了一条新线索。重要的是:它会等待回复,以实现
未来
。根据你的代码,不会有任何回复,因此,你在网关中的线程会等待很长时间。
您应该考虑将网关的签名更改为
当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该
我有一个集成应用程序,大部分工作,但注意到昨天一个消息丢失了。当时,service-activatorendpoint正忙于处理先前的消息。 以下是适用于该问题的配置。
在Spring集成中使用出站网关时,我试图在JMS标头中发送回复Q详细信息。我了解到JIRA#INT-97中的增强功能在将Spring消息标头发送到JMS目标之前将其复制到JMS标头。 在将消息发送到出站网关之前,将消息头设置如下。message.getHeader(). setAtcm(JmsTargetAdapter.JMS_REPLY_TO, myReplyDestation); 但是我无法
我正在开发一个简单的REST应用程序,它利用RxJava向远程服务器发送请求(1)。对于REST API的每个传入请求,都会向(1)发送一个请求(使用RxJava和RxNetty)。一切正常,但现在我有了一个新的用例: 为了不让太多的请求轰炸(1),我需要实施速率限制。解决这个问题的一种方法(我假设)是将在向(1)发送请求时创建的每个可观察的(2)添加到另一个执行实际速率限制的(2)中。(2) 然
我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方
我有一项服务,使用Firebase Cloud Messaging与Android客户端进行通信,使用带有参数设置的FCM数据消息。从有关可折叠密钥的文档中: 当有新消息呈现旧线程时,相关消息与客户端应用程序无关,FCM将替换旧消息。例如,发送到同步或过期的通知消息。 这就是我要找的。我不需要所有的更新,只需要最后一个。但是,如果用户在线,我需要它尽快。 然而,我得到了一个奇怪的速率限制,它不会导