当前位置: 首页 > 知识库问答 >
问题:

spring集成:AmqpInboundChannelAdapter上的TaskExecutor和MaxConcurrentConsumers

焦博实
2023-03-14

我的spring集成应用程序使用来自RabbitMQ的消息,将它们转换为SOAP消息,并执行web服务请求。

每秒可以从队列中获得许多(10-50)条消息。或者在应用程序重新启动后,RabbitMQ队列中可能有数千条消息。

在并行线程中处理多达10条消息的最佳可能场景是什么(消息排序很好,但不是必需的特性,如果web服务的回答是业务失败,那么失败的消息应该重试直到成功)。

Amqp侦听器不应该从队列中消耗更多的消息,因为任务执行器中可用的线程不忙。我可以在一个通道中定义一个ThreadExecutor,如下所示:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue);
}

IntegrationFlow integrationFlow = IntegrationFlows
  .from(amqpInboundChannelAdapter)
  .channel(c -> c.executor(exportFlowsExecutor))
  .transform(businessObjectToSoapRequestTransformer)
  .handle(webServiceOutboundGatewayFactory.getObject())
  .get();

或者像这样在AmqpInboundChannelAdapter中定义一个任务执行器,而不在流定义中定义channels任务执行器就足够了:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.taskExecutor(taskExecutor));
}

或者可以像选项1那样为通道定义任务执行器,但还可以在通道适配器上设置maxConcurrentConsumers,如下所示:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.maxConcurrentConsumers(10));
}

共有1个答案

叶德运
2023-03-14

ListenerContainer上配置并发并让所有下游进程发生在来自容器的线程上的最佳实践。这样,当因为线程忙而没有更多的消息从队列中轮询时,您就会得到一个自然的背压。另一方面,由于在侦听器容器之后使用executorchannel释放一个轮询线程,并且当前消息将被当作已用消息进行处理,所以不会导致消息丢失,但是可能会在下游失败。

 类似资料:
  • 问题内容: 我想使用Spring Batch和Spring Integration从数据库导入数据,并将它们写入文件,然后通过ftp将其传输到远程服务器。 但是我想我的问题是我不想为我的表创建域对象。我的查询是随机的,我想要一些可以读取数据并将其写入文件并进行传输的东西。 是否可以在不创建各自的域对象的情况下使用Spring Batch和Integration? 问题答案: 绝对。您可以将JDBC

  • Spring是一个流行的Web框架,它提供易于集成与很多常见的网络任务。所以,问题是,为什么我们需要Spring,当我们有Struts2?Spring是超过一个MVC框架 - 它提供了许多其它好用的东西,这是不是在Struts。例如:依赖注入可以是有用的任何框架。在本章中,我们将通过一个简单的例子来看看如何集成Spring和Struts2一起。 首先,需要添加下列文件到项目的构建路径从Spring

  • 我正在尝试使用几个s-vaadin、jsp等实现一个应用程序。 它使用简单的,但后来我决定使用vaadin作为ui。 我创建了vaadin servlet(Spring的servlet也留下了)。我的看起来像这样 我创建了vaadin组件并根据我的需要对其进行了调整。我使用自动装配进行服务。 也是一个Spring bean。 < code>ProjectRepository是另一个spring b

  • 我正在使用一个Spring Boot+Spring Security OAuth2应用程序,我相信它的灵感来自Dave Syer的示例。应用程序被配置为OAuth2授权服务器,具有使用资源所有者密码凭据流的单个公共客户端。成功的令牌被配置为JWT。 公共Angular客户机向/oauth/token发送一个POST请求,该请求带有包含客户机id和秘密的基本身份验证头(这是让客户机进行身份验证的最简

  • 问题内容: 任何人都可以在此处粘贴简单的步骤来集成Spring Security和CAS,以进行单点登录和单点退出。注意我不需要任何基于角色的访问。我有一个已经与spring security集成的Web应用程序。现在,我尝试使用CAS执行SSO,但是出现此错误 这是我当前的spring security.xml 这是我的web.xml 这是我的Spring-rootcontext.xml 这是我

  • 问题内容: 我有一个原型Storm应用程序,该应用程序读取STOMP流并将输出存储在HBase上。它可以工作,但不是很灵活,我试图与我们的其他应用程序以更一致的方式进行设置,但运气不好,无法确定当前使用Storm的方式。我们使用spring- jms类,但不是在标准spring方法中使用它们,而是在运行时创建它们并手动设置依赖项。 这个项目:https : //github.com/granthe