当前位置: 首页 > 知识库问答 >
问题:

如何使用Apache Camel浏览队列中的消息?

楮星鹏
2023-03-14

我需要使用驼峰路由浏览来自活动mq的消息,而不使用这些消息。

JMS队列中的消息将被读取(仅浏览而不使用)并移动到数据库中,同时确保原始队列保持完整。

public class CamelStarter {

   private static CamelContext camelContext;

            public static void main(String[] args) throws Exception {
                            camelContext = new DefaultCamelContext();
                            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);

                            camelContext.addComponent("jms",  JmsComponent.jmsComponent(connectionFactory));

                            camelContext.addRoutes(new RouteBuilder() {

                                            @Override
                                            public void configure() throws Exception {
                                                from("jms:queue:testQueue").to("browse:orderReceived") .to("jms:queue:testQueue1");
                                            }

                                            }
                            );

                            camelContext.start();

                            Thread.sleep(1000);

                             inspectReceivedOrders();

                            camelContext.stop();

            }

公共静态无效检查ReceivedOrders(){

BrowsableEndpoint browse = camelContext.getEndpoint("browse:orderReceived", BrowsableEndpoint.class);
List<Exchange> exchanges = browse.getExchanges();
System.out.println("Browsing queue: "+ browse.getEndpointUri() + " size: " + exchanges.size());
for (Exchange exchange : exchanges) {
  String payload = exchange.getIn().getBody(String.class);
  String msgId = exchange.getIn().getHeader("JMSMessageID", String.class);
  System.out.println(msgId + "=" +payload);

}

共有2个答案

吕亮
2023-03-14

据我所知,不可能在骆驼阅读(不消费!)JMS消息:-(我(在JEE应用程序中)找到的唯一解决方法是使用计时器定义启动EJB,持有QueueBrowser,并将消息处理委托给驼峰路由:

@Singleton
@Startup
public class MyQueueBrowser  {

    private TimerService timerService;

    @Resource(mappedName="java:/jms/queue/com.company.myqueue")
    private Queue sourceQueue;

    @Inject
    @JMSConnectionFactory("java:/ConnectionFactory")
    private JMSContext jmsContext;  

    @Inject
    @Uri("direct:readMessage")
    private ProducerTemplate camelEndpoint;


    @PostConstruct
    private void init() {       
        TimerConfig timerConfig = new TimerConfig(null, false);
        ScheduleExpression se = new ScheduleExpression().hour("*").minute("*/"+frequencyInMin);
        timerService.createCalendarTimer(se, timerConfig);
    }


    @Timeout
    public void scheduledExecution(Timer timer) throws Exception {      
        QueueBrowser browser = null;
        try {                       
            browser = jmsContext.createBrowser(sourceQueue);                                           
            Enumeration<Message> msgs = browser.getEnumeration();
            while ( msgs.hasMoreElements() ) { 
                Message jmsMsg = msgs.nextElement(); 
                // + here: read body and/or properties of jmsMsg                                            
                camelEndpoint.sendBodyAndHeader(body, myHeaderName, myHeaderValue);
            }                                                                               
        } catch (JMSRuntimeException jmsException) {
            ...
        } finally {        
            browser.close();
        }
    }


}
周苑博
2023-03-14

Apache camel browse组件正是为此而设计的。在这里查看文档。

因为您没有提供任何其他信息,所以不能再多说了。

假设你有这样的路线

from("activemq:somequeue).to("bean:someBean")  

或者

from("activemq:somequeue).process(exchange -> {})  

你所要做的就是像这样在两者之间放置一个浏览endpoint

from("activemq:somequeue).to("browse:someHandler").to("bean:someBean")   

然后写一个这样的类

@Component
public class BrowseQueue {

  @Autowired
  CamelContext camelContext;

  public void inspect() {
    BrowsableEndpoint browse = camelContext.getEndpoint("browse:someHandler", BrowsableEndpoint.class);
    List<Exchange> exchanges = browse.getExchanges();


    for (Exchange exchange : exchanges) {
      ...... 
    }
  }

}
 类似资料:
  • 问题内容: 我正在使用Java的MQ类编写一个简单的Java应用程序。 现在,我可以浏览远程队列而无需删除存储的消息。 这是阅读周期的代码: 主要问题: 在已读消息行之后,将光标移动到下一条消息之前,如何从队列中删除该消息? 第二个问题: Eclispe警告我,不赞成使用所有用于期权的成本;哪些是正确使用的? 解: 这是我真正想要的解决方案: 这些行必须插入问题代码中 我在这里找到它:http :

  • 我们的环境由3个jboss服务器组成(门户、jms、协调)。 协调服务器托管骆驼路由,该路由具有消耗自队列(SLAQueue)的路由 JMS服务器托管了我们的所有队列 最近,我们发现了一个错误,即托管在JMS服务器上的TaskQueue中的一些消息没有传递到门户服务器上的MDB。由于某些原因,它们被卡住了,当我们重新启动JMS服务器时,卡住的消息被传递 为了进行调查,我们在“org.apache.

  • 我正在尝试查看ActiveMQ(5.11.1)中队列中的所有消息。为此,我使用Hawtio(1.4.51)。我在ActiveMQ中的队列包含790条消息。 我的步骤到现在: 默认情况下,hawtio在ActiveMQ队列中最多显示400条消息。所以我去了我的broker.xml设置并添加了: 这给了我401条信息。 所以我尝试将maxBrowsePageSize=“401”更改为“-1”。令我惊讶

  • 我正在努力寻找一个成熟的例子,说明如何在Spring Boot框架中使用ApacheCamel进行轮询。 我已经看过了:https://camel.apache.org/manual/latest/polling-consumer.html除此之外:https://camel.apache.org/components/latest/timer-component.html但是代码示例不够广泛,我

  • 我正在开发一个使用Apache Camel和JMX活动的小应用程序。非常简单地说,我有一个使用SEDA组件的路由--只有一个消费者--简而言之,它创建自己的线程,并在路由繁忙时对传入的交换进行排队。 我想知道Camel中是否有一些现成的东西允许我这样做,或者我忽略了Hawtio或JConsole中的一些东西。 提前谢了。

  • 我的ActiveMQ消息传递实例(Amazon MQ上的ActiveMQ 5.16.2)使用STOMP。我不能使用JMS队列浏览器,也没有办法“解锁”消息。一旦有消费者从队列中提取该消息,即标记为“未消费”,如这里的文档所述。 假设代理无法更改,我在这里查看的是 JMS 的 REST API 映射,但我没有看到任何模仿 ActiveMQ 管理页面 (JSP) 的endpoint,该endpoint