当前位置: 首页 > 知识库问答 >
问题:

如何在多个线程中执行Spring集成流以并行使用更多Amazon SQS队列消息?

须敏学
2023-03-14

需要帮助吗

我需要创建多个并行执行的sqs队列使用者,但我不知道如何使用Sprint集成来实现这一点

我有以下架构

包含20万条消息的Amazon SQS队列

一个包含5个EC2实例的Amazon堆栈,每个实例都有tomcat服务器,运行一个Spring Boot应用程序,该应用程序具有Spring集成流,该集成流使用Spring集成aws的SQS消息驱动通道适配器来消费SQS的消息(https://github.com/spring-projects/spring-integration-aws)

并将消息发布到平均响应时间为1秒的REST服务(我无法修改REST服务,但我可以并行发送消息)

SQS队列-

限制Amazon SQS允许客户端批量读取最多10条消息的请求,但我可以有多个客户端并行使用更多的消息。

在Amazon SQS中,需要手动删除消息。这是使用spring integration完成的,我只在REST服务返回OK时删除消息。

我对可能的重复没有问题(SQS向两个不同的客户端发送相同的消息)

我无法在Spring Boot应用程序中以任何方式存储消息

我的Spring整合流程

<aws-messaging:sqs-async-client id="clientWithCredentials"/>
<int-aws:sqs-message-driven-channel-adapter
  sqs="clientWithCredentials" 
  channel="channel_1"
  queues="https://sqs.us-east-1.amazonaws.com/123456789000/SomeAmazonSQSName"
  max-number-of-messages="10"/>

<int:channel id="channel_1" />
<int:outbound-channel-adapter ref="restService" method="publish" channel="channel_1" />

如何在多个线程中并行执行此流,以并行地消耗更多消息?

我试着把


共有1个答案

金嘉
2023-03-14

为了达到这样的要求,您可以使用executor频道,而不是默认的DirectChannel

这样,所有的SQS消息都将被分发到执行器通道提供的线程中,因此并行执行。

有关执行器频道的更多信息,请参阅参考手册。

使现代化

所以,我的建议应该反映在您当前的配置中,比如:

<int:channel id="channel_1">
   <int:dispatcher task-executor="someExecutor"/>
</int:channel>

使现代化

如果你仍然坚持要几个SQS适配器,那么简化版本是这样的:

<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />


<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />

<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />

<int:channel id="sqs-to-metricator" />

<int:outbound-channel-adapter ref="restService"
    method="publish" channel="sqs-to-metricator" />

为了避免重复,您可以考虑切换到java DSL,并开始使用其<代码> ITNGRESATORACTION上下文<代码> >动态<代码>集成流< /代码>注册:https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/java-dsl.html#java-dsl运行时流

 类似资料:
  • 默认情况下,C++容器应该是线程安全的。我必须错误地使用多线程,因为对于此代码: 我得到了:

  • 问题内容: 我有两个分开的阻塞队列。客户端通常使用第二个阻塞队列中的第一个来检索要处理的元素。 在某些情况下,客户端对两个阻塞队列中的元素感兴趣,无论哪个队列首先提供数据。 客户端如何并行等待两个队列? 问题答案: 您可以尝试在某种循环中使用该方法,以仅在指定时间量内等待一个队列,然后再轮询另一个队列。 除此之外,我会说在另一个线程上为每个队列运行阻塞操作并为您的主应用程序提供回调接口是另一个稍微

  • 我想创建一个异步读取kafka消息的流,并使用队列通道累积大量要处理的消息,并且只有在处理完这些消息(例如50条消息)后,它才能处理另外50条消息,或者在释放队列中的空间时。我尝试使用一个从kafka委托读取到另一个流的流,该流具有一个带有PollerMetadata(Pollers.fixedDelay(500))的QueueChannel。maxMessagesPerPoll(50))但是轮询

  • 问题内容: 我有以下方法: 在这里,我依次调用三种方法,这依次命中数据库并获取我的结果,然后对从数据库命中获得的结果进行后处理。我知道如何通过使用并发调用这三种方法。但是我想用Java 8 来实现。有人可以指导我如何通过并行流实现相同目标吗? 编辑 我只想通过Stream并行调用方法。 问题答案: 您可以利用这种方式:

  • 问题内容: 我有一个问题,那就是Django可以执行多线程工作吗? 这是我要执行的操作:单击网页上的按钮,然后model.py中开始运行某些功能,例如,从Internet上爬网一些数据,完成后它将返回给用户结果。 我想知道我必须打开一个新线程来执行model.py中的功能,有人可以告诉我该怎么做吗?非常感谢你。 问题答案: 是的,它可以多线程,但是通常使用Celery来完成。你可以在celery-

  • 6.7.2.多线程执行 与在单线程中阻塞相比,更好的做法是让程序运行在多个线程之中。系统负责分配CPU时间,几个线程仿佛在同一时刻同时运行。这样可以避免某线程独占计算资源。 图6.10. 多线程执行 在例子中,我们将网络操作的相关代码放到独立的线程里面。这样我们的主线程可以避免阻塞在网络操作上,用户界面不会响应不灵。按惯例,我们一般认为主线程是运行于前台,而其它的线程都是运行于后台。这是因为前端的