当前位置: 首页 > 知识库问答 >
问题:

骆驼消费者在关机时仍会收到新消息

壤驷鸿祯
2023-03-14

我使用的是Camel 2.16.1。关闭后,骆驼的消费者仍然接受新消息。有没有办法迫使消费者立即停止消费。这里也有同样的问题:驼峰关机策略:飞行中的信息不会减少

我为这个问题创建了一个测试用例:

    public class GracefulShutdownTest extends CamelTestSupport {

    public class ThreadStopContext implements Runnable {
        private CamelContext context;

        public ThreadStopContext(CamelContext context) {
            this.context = context;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println("------start shutdown....");
                context.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public class Sender implements Runnable {
        private ProducerTemplate template;

        public Sender(ProducerTemplate template) {
            this.template = template;
        }

        @Override
        public void run() {
            template().sendBody("direct:start", "test");
        }
    }

    @Test
    public void shutdownTest() throws Exception {
        context().setTracing(true);
        getMockEndpoint("mock:endpoint").expectedMessageCount(0);
        getMockEndpoint("mock:deadletterEndpoint").expectedMessageCount(1);
        Thread thread = new Thread(new ThreadStopContext(context()));
        thread.start();
        Thread threadSender1 = new Thread(new Sender(template()));
        threadSender1.start();
        Thread.sleep(2000);
        System.out.println("-------------------second message-----------------------------");
        Thread threadSender2 = new Thread(new Sender(template()));
        threadSender2.start();
        assertMockEndpointsSatisfied();

    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                errorHandler(deadLetterChannel("direct:deadletter")
                        .logHandled(true));

                from("direct:start").process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("----------start sleep....");
                        Thread.sleep(5000);
                        System.out.println("----------end sleep....");
                        throw new Exception();
                    }
                });

                from("direct:deadletter")
                        .startupOrder(1)
                        .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("----message at deadletter----");
                    }
                })
                        .to("mock:deadletterEndpoint");
            }
        };
    }
}

运行测试用例时,我们可以看到机上交换的数量在开始优雅关闭后增加:

Waiting as there are still 1 inflight and pending exchanges to complete
.....
Waiting as there are still 2 inflight and pending exchanges to complete

共有1个答案

斜单鹗
2023-03-14

在阅读Defaultshutdownstrategy类的源代码后,我发现当关闭消费者时,Camel更喜欢挂起而不是关闭。这意味着,如果消费者支持suspend,它将执行suspend方法,然后让消费者将列表延迟到稍后关闭。一些消费者(direct、RabbitMQ)有空的suspend()方法,然后他们仍然像在延迟模式下一样获得新消息

 类似资料:
  • 我在JPA上遇到了以下问题,但这可能更像是一个关于骆驼的概念问题。 我需要一个基于cron的石英消费者。但如果触发了,我想选择JPA组件作为第一步。 但是如果我用“to”调用JPA组件,那么它被用作生产者,而不是消费者。我可以以某种方式使用JPA组件来处理这个问题吗,或者我必须遵循服务激活器(基于bean的)逻辑并将JPA组件留在后面? 提前谢谢你,葛格利

  • 我有一个从JMS队列读取项目并将其写入数据库的路径。 我已经阅读了关于ApacheCamelJMS组件的文档,但我没有得到我的问题的确切和明确的答案,即“如果路由中出现异常,JMS消费者是否会重新插入项目或解锁JMS队列中的消息?”。 谢谢 阿里

  • 我对骆驼生产商有很好的了解,但我不能对各种骆驼消费者保持清醒的头脑。特别是事件驱动消费者和轮询消费者,camel如何知道为这些消费者调用回调? 消费者的一般流量是多少?

  • 我有一个场景,我想“拉”RabbitMQ队列/主题的消息,并一次处理一个。特别是当消费者启动时,队列中已经有消息。我尝试了以下方法,但没有成功(这意味着,这些选项中的每一个都会读取队列,直到队列为空,或者直到另一个线程关闭上下文)。 1.第一次处理后立即停止路由 与1类似,但使用闩锁而不是while loop和sleep。 使用轮询消费者 使用ConsumerTemplate()-类似于上面的代码

  • 我在为 端口设置 消费者以捕获消息时遇到问题。我的: 申请开始: 并且<code>514</code>端口已打开但未侦听 我可以在tcpdump中看到,tcpdump-I eth1-nn-A-s 0端口514和udp正确发送和接收消息。 有人能告诉我我做错了什么吗?

  • 我一直在尝试为Spring引导Kafka骆驼Avro消费者寻找示例代码,但没有运气。我在以下URL找到了Spring Camel Kafka消费者和生产者示例: https://thysmichels.com/2015/09/04/apache-camel-kafka-spring-integration/ 我的具体问题是,一旦我的bean从Avro模式创建,并且我有了POJO类,我如何将上面的c