当前位置: 首页 > 知识库问答 >
问题:

Camel消费单个消息并停止交易

蓝苗宣
2023-03-14

我正尝试使用Camel以事务方式从JMS队列中消费一条消息。特别是在这样的流程中:

  1. 等待消息在JMS队列上发布
  2. 尝试消费和处理单个消息
  3. 如果处理失败(发生异常),回滚消耗
  4. 如果处理通过,确认并停止使用更多消息
  5. 在应用程序生命周期的后期,另一个进程触发消费从(1)重新开始

起初,我试图使用轮询消费者,使用ConsumerTemplate来做这件事,但是我不知道是否可以通过事务来做这件事——似乎事务是ConsumerTemplate的内部事务,所以不管我做什么,当ConsumerTemplate返回时,消息已经被确认为被消费了。

我可以使用消费者模板做到这一点吗?我可以使用骆驼做到这一点吗?如果可以,最好的方法是什么(简单的例子将不胜感激)?

共有1个答案

邓丰
2023-03-14

我最终使用了polEnrich dsl来实现这一点。例如,我的路由生成器如下所示:

from("direct:service-endpont").transacted("PROPOGATION_REQUIRED").setExchangePattern(ExchangePattern.InOut).pollEnrich("activemq:test-queue").bean(myHandler);

我将直接endpoint用作服务,向直接endpoint发送“请求”消息会轮询 jms 队列以获取单个消息(如果需要,则阻止)。启动的事务扩展到 pollEnrich 因此,例如,如果 myHandler bean 失败,则在 pollEnrich 期间获取的消息不会被使用并保留在队列中。

 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我的应用程序使用来自RabbitMQ的一些消息并对其进行处理。我有大约10个队列,每个队列最多有10个消费者(线程)。我有5次预回迁。我正在Heroku中使用CloudAMQP插件(RabbitMQ作为服务)运行安装程序。 我使用默认心跳和连接超时设置(60秒)运行。 我的java应用程序是一个使用sping-Rabbit库的Spring Boot应用程序。 版本: 问题是对于一个特定队列的消费者

  • 我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢

  • 有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。

  • 主要内容:1 start启动服务定时清理过期消息,1.1 cleanExpireMsg清理过期消息,1.2cleanExpiredMsg清理过期消息,2 submitConsumeRequest提交消费请求,2.2 submitConsumeRequestLater延迟提交,2.2 consumeMessageBatchMaxSize和pullBatchSize,3 ConsumeRequest执行消费任务,,,,基于RocketMQ release-4.9.3,深入的介绍了ConsumeMes

  • 我有以下场景:有3个rabbitmq队列,生产者根据消息的优先级将消息推送到这些队列。(myqueue_high,myqueue_medium,myqueue_low)我希望有一个可以按顺序或优先级从这些队列中提取的单一使用者,即只要消息在那里,它就一直从高队列中提取。它是从介质中拉出来的。如果medium也是空的,它从Low拉出。 我如何实现这一点?我需要编写自定义组件吗?