当前位置: 首页 > 知识库问答 >
问题:

spring集成以并发方式处理消息

钮高朗
2023-03-14

我正在尝试使用spring integration设置我的应用程序,作为一名新手,需要以下用例的建议-

有一个队列,来自另一个应用程序的消息将被推送到该队列。我的应用程序使用队列中的消息,进行一些数据处理,然后将其推送到另一个出站队列。目标是以并发方式处理消息。

根据我的理解,我们可以有两种方法-

1.使用#轮询器

<int:service-activator ref="messageProcessor" method="process" input-channel="incomingChannel">
    <int:poller fixed-rate="100" task-executor="executor" />
<int:service-activator/>
<task:executor id="executor" pool-size="10"/>

2.使用#调度器

<int:channel id="incomingChannel">
    <int:dispatcher task-executor="executor"/>
</int:channel>
<int:service-activator ref="messageProcessor" method="process" input-channel="incomingChannel" />

<task:executor id="executor" pool-size="10"/>

从基于轮询器的配置来看,池中似乎有多个可用线程,可以同时获取消息和处理消息。这里的担忧是-

i)轮询器将不必要地在后台运行(资源命中)。

ii)即使消息在信道上,也会根据固定速率浪费毫秒(因为轮询器将在每100秒后获取消息)。

基于调度器的配置似乎是消息驱动的,可能非常适合我的用例。如果我错了,请纠正我,在这种情况下,线程只负责将消息分派给订阅者。

如果是那样的话-

是否需要与通道关联多个服务激活器?

有没有办法动态附加相同服务激活器配置的多个实例?

建议我什么是处理这个问题的最佳方法。谢谢

共有1个答案

王翰墨
2023-03-14

(i) 投票非常轻量级;即使使用固定速率0,线程也会在通道中阻塞1秒(默认情况下-接收超时);我甚至怀疑你会测量空闲轮询器使用的cpu。

(ii)参见(i)-减少轮询间隔将缓解任何等待时间。

在任何情况下,如果您谈论的是JMS,您可能都不想使用这两种方法——如果您希望出站发送参与与入站消息相同的事务(即,只有在发送成功时才提交删除),则不得将其转移到另一个线程——如果您这样做,此时将立即执行消息删除。

相反,使用消息驱动的通道适配器,并使用其并发设置来增加线程数量;然后在整个过程中使用直接通道。

 类似资料:
  • 当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该

  • 我需要解决这个场景。我有两个amqp消费者设置来获取一条消息。 taskChannel是queuechannel,但一次只允许使用一条消息,因此没有并行处理。如果另一条消息花了太长时间才继续,我如何在超时后拒绝一条消息。那么这个消息将返回到队列,由另一个节点继续?我的意思是,这两个消费者预取了两条消息,但一次只能处理一条,所以如果第一条预取消息需要很长时间才能处理,那么如何释放第二条预取消息呢。

  • 我试着为下一个spring cloud流版本准备我们的应用程序。(当前使用3.0.0.rc1)。用Kafka的活页夹。 现在我们收到一个消息,处理它,并将它重新发送到另一个主题。单独处理每个消息会导致对数据库的大量单个请求。 在3.0.0版本中,我们希望以批处理的方式处理消息,这样我们就可以在批更新中保存数据。 在当前版本中,我们使用了@enablebinding、@streamlistener

  • 我们一直在使用SI Kafka进行一个新项目,并取得了很大成功。在最近的一次切换之前,我们使用KafkaTopicOffsetManager来管理我们的消费者主题偏移量。为了避免每个消费者/主题对都有额外的主题,并使用Burrow或lag监控,我们决定使用最新的KafkaNativeOffsetManager,它使用Kafka提供的本机偏移管理。但在切换之后,我们注意到目标主题的消息消耗持续滞后。

  • 我正在构建一个调用许多不同web服务的系统,我希望生成一个关于调用ws后返回的所有错误的报告。 为此,我使用了<代码> 组在完成时过期=“true”/ 和激活剂: 谢谢

  • 我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程