当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ骆驼消费者-使用单个消息

龚迪
2023-03-14

我有一个场景,我想“拉”RabbitMQ队列/主题的消息,并一次处理一个。特别是当消费者启动时,队列中已经有消息。我尝试了以下方法,但没有成功(这意味着,这些选项中的每一个都会读取队列,直到队列为空,或者直到另一个线程关闭上下文)。

1.第一次处理后立即停止路由

final CamelContext context = new DefaultCamelContext();
try {
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            RouteDefinition route = from("rabbitmq:harley?queue=IN&declare=false&autoDelete=false&hostname=localhost&portNumber=5672");
            route.process(new Processor() {
                Thread stopThread;

                @Override
                public void process(final Exchange exchange) throws Exception {
                    String name = exchange.getIn().getHeader(Exchange.FILE_NAME_ONLY, String.class);
                    String body = exchange.getIn().getBody(String.class);

                    // Doo some stuff
                    
                    routeComplete[0] = true;
                    if (stopThread == null) {
                        stopThread = new Thread() {
                            @Override
                        public void run() {
                                try {
                                    ((DefaultCamelContext)exchange.getContext()).stopRoute("RabbitRoute");
                                } catch (Exception e) {}
                            }
                        };
                    }
                    stopThread.start();
                }
            });
        }
    });
    context.start();
    while(!routeComplete[0].booleanValue())
        Thread.sleep(100);

    context.stop();
}

与1类似,但使用闩锁而不是while loop和sleep。

使用轮询消费者

 final CamelContext context = new DefaultCamelContext();
 context.start();
 Endpoint re = context.getEndpoint(srcRoute);
 re.start();
 try {
     PollingConsumer consumer = re.createPollingConsumer();
     consumer.start();
     Exchange exchange = consumer.receive();
     String bb = exchange.getIn().getBody(String.class);
     consumer.stop();
 } catch(Exception e){
     String mm = e.getMessage();
 }

使用ConsumerTemplate()-类似于上面的代码。

我还尝试过启用预回迁,并将最大交换次数设置为1。

这些似乎都不起作用,如果队列中有3条消息,则在我能够停止路由之前,所有消息都已被读取。如果我要使用标准的RabbitMQ Java API,我会使用basicGet()调用来读取单个消息,但出于其他原因,我更喜欢使用骆驼消费者。

是否有人能够成功地使用Camel RabbitMQ消费者处理包含多个消息的队列上的单个消息?

谢谢

共有1个答案

池麒
2023-03-14

这并不是组件的主要意图,因为它需要继续接收。但我已经创建了一个票证来研究如何支持basicGet(单一接收)。3.8版之后将推出一个新的基于spring的rabbitmq组件,因此它将在那里实现(首先):https://issues.apache.org/jira/browse/CAMEL-16048

 类似资料:
  • 我在JPA上遇到了以下问题,但这可能更像是一个关于骆驼的概念问题。 我需要一个基于cron的石英消费者。但如果触发了,我想选择JPA组件作为第一步。 但是如果我用“to”调用JPA组件,那么它被用作生产者,而不是消费者。我可以以某种方式使用JPA组件来处理这个问题吗,或者我必须遵循服务激活器(基于bean的)逻辑并将JPA组件留在后面? 提前谢谢你,葛格利

  • 我有一个从JMS队列读取项目并将其写入数据库的路径。 我已经阅读了关于ApacheCamelJMS组件的文档,但我没有得到我的问题的确切和明确的答案,即“如果路由中出现异常,JMS消费者是否会重新插入项目或解锁JMS队列中的消息?”。 谢谢 阿里

  • 我对骆驼生产商有很好的了解,但我不能对各种骆驼消费者保持清醒的头脑。特别是事件驱动消费者和轮询消费者,camel如何知道为这些消费者调用回调? 消费者的一般流量是多少?

  • 我已经和ApacheCamel合作了一段时间,做了一些基本的工作,但现在我正在尝试创建一个路由,在该路由中,我可以让多个“消费者”访问同一条路由,或者在路由中添加一个消费者,然后处理消息。 我的想法是拥有一个由事件触发的事件驱动消费者,然后例如从ftp读取文件。我正计划做这样的事情: 所以这个想法是我有一个事件(例如直接或来自消息队列),它具有“fileName”属性,然后使用该属性从ftp下载/

  • 我在为 端口设置 消费者以捕获消息时遇到问题。我的: 申请开始: 并且<code>514</code>端口已打开但未侦听 我可以在tcpdump中看到,tcpdump-I eth1-nn-A-s 0端口514和udp正确发送和接收消息。 有人能告诉我我做错了什么吗?

  • 我一直在尝试为Spring引导Kafka骆驼Avro消费者寻找示例代码,但没有运气。我在以下URL找到了Spring Camel Kafka消费者和生产者示例: https://thysmichels.com/2015/09/04/apache-camel-kafka-spring-integration/ 我的具体问题是,一旦我的bean从Avro模式创建,并且我有了POJO类,我如何将上面的c