当前位置: 首页 > 知识库问答 >
问题:

Apache Camel在一个ActiveMQ实例上使用多个队列

周峻
2023-03-14

我有一个ActiveMQ服务器的实例,它运行着几个数据源,如果消息消费失败,它会将数据推送到两个队列中,一个是DLQ。我使用Apache Camel来消费和处理来自这些队列的消息,并想将其写入InfxDB。

然而,到目前为止,我未能让堆栈运行,以使Apache Camel并行消耗所有队列。我总是遇到这种错误:

ERROR 20831 --- [r[ActiveMQ.DLQ]]
c.c.j.DefaultJmsMessageListenerContainer : Could not refresh JMS
Connection for destination 'ActiveMQ.DLQ' - retrying using
FixedBackOff{interval=5000, currentAttempts=270,
maxAttempts=unlimited}. Cause: Broker: localhost - Client: Influx
Message Queue already connected from tcp://ip-of-machine-running-route:port

一个Apache Camel实例如何使用多个队列?

我尝试了两种方法:

  • 每个路由活动MQ队列 -

目前,我的代码如下所示:

Camel配置

@Configuration
public class CamelConfig {

    @Bean
    public ShutdownStrategy shutdownStrategy() {
        MessageLogger.logInfo(getClass(), "Camel Route: STARTING...",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        DefaultShutdownStrategy strategy = new DefaultShutdownStrategy();
        int                     timeout  = 1200;
        MessageLogger.logInfo(getClass(), "Camel Route: Timeout for shutdown: " + timeout + " seconds.",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        strategy.setTimeout(timeout); // TODO make it configurable
        return strategy;
    }
}

ActiveMQ客户端配置

@Configuration
public class ActiveMqClientConfig {

    @Bean
    public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://servername:61616");
        connectionFactory.setUserName(username);
        connectionFactory.setPassword(passwd);
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setClientID("Influx Message Queue");
        connectionFactory.setConnectResponseTimeout(300);
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        return connectionFactory;
    }
}

流入配置

@Configuration
public class InfluxDBClientConfig {

    @Bean
    public InfluxDbOkHttpClientBuilderProvider registerInfluxDbOkHttpClientBuilderProvider() {
        return () -> {
            MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTING...",
                    Thread.currentThread().getStackTrace()[0].getMethodName());
            Builder builder = new OkHttpClient.Builder() //
                    .readTimeout(1200, TimeUnit.SECONDS) //
                    .writeTimeout(1200, TimeUnit.SECONDS) //
                    .connectTimeout(1200, TimeUnit.SECONDS) //
                    .retryOnConnectionFailure(true);
            MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTED - " + builder.toString(),
                    Thread.currentThread().getStackTrace()[0].getMethodName());
            return builder;
        };
    }
}

具有多个路由的组件:

@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
    @Autowired
    private FrameworkConfig frameworkConfig;

    @Override
    public void configure() throws Exception {
        String consumerQueueq = "activemq:queue:queue1?"               //
                + "brokerURL=tcp://ip:port";
        String consumerActiveMqDLQ    = "activemq:queue:ActiveMQ.DLQ?"                     //
                + "brokerURL=tcp://ip:port";
        String consumerQueue2           = "activemq:queue:queue2?"                     //
                + "brokerURL=tcp://ip:port";
        String emitterInfluxDB        = "influxdb://influxDb?databaseName=databaseName"          
                + "&batch=true"                                                            //
                + "&retentionPolicy=retentionPolicy"
        String emitterStreamOut       = "stream:out";

        //************************************************************************
        // Data from cryring_db_inbound to InfluxDB
        //************************************************************************
        from(consumerCryringInbound) //   
                .process(messagePayload -> {
                    Message message = messagePayload.getIn();
                    if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
                        Processor.processMessage(message.getBody(String.class), message);
                    } else {
                        Processor.processMessage(message);
                    }
                })//
                .to(emitterInfluxDB) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .log("error") //
                .to(emitterStreamOut);

        //************************************************************************
        // Data from cryring_db_inbound to InfluxDB
        //************************************************************************
        from(consumerActiveMqDLQ) //
                .process(messagePayload -> {
                    Message message = messagePayload.getIn();
                    if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
                        Processor.processMessage(message.getBody(String.class), message);
                    } else {
                        Processor.processMessage(message);
                    }
                })//
                .to(emitterInfluxDB) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .log("error") //
                .to(emitterStreamOut);

        //************************************************************************
        // Data from olog_inbound to olog
        //************************************************************************
        from(consumerOlog) //
                .process(messagePayload -> {
                    System.out.println(messagePayload.getIn());
                }) //
                .to(emitterStreamOut);
    }
}

共有2个答案

薛博艺
2023-03-14

与 Camel 多个消费者实现问题建议的(一个@Component中有多个 from().to() 路由)相反,我设法通过将路由拆分为单独的组件来完成这项工作,每个组件都有一个单独的 clientId。此外,我用UUID替换了ActiveMQ配置中的静态客户端ID。法典:

ActiveMQ-Config:

@Configuration
public class ActiveMqClientConfig {

    @Bean
    public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://servername:port");
        connectionFactory.setUserName(username);
        connectionFactory.setPassword(password);
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setClientID(UUID.randomUUID().toString());
        connectionFactory.setConnectResponseTimeout(300);
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        return connectionFactory;
    }
}

路由组件:

@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
    @Autowired
    private FrameworkConfig frameworkConfig;

    @Override
    public void configure() throws Exception {
        String consumerCryringInbound = "activemq:queue:queue1?"
                + "brokerURL=tcp://activemq-server-ip:port" 
                + "clientId=clientid1";

        String emitterInfluxDB = "influxdb://influxDb?databaseName=influx_db_name"
                + "&batch=true"                                                           
                + "&retentionPolicy=retentionPolicy";

        String emitterStreamOut       = "stream:out";

        //************************************************************************
        // Data from cryring_db_inbound to InfluxDB
        //************************************************************************
        from(consumerCryringInbound)    
                .process(processor code)
                .to(emitterInfluxDB)
                .onException(Exception.class) 
                .useOriginalMessage()
                .handled(true) 
                .log("error") 
                .to(emitterStreamOut);
    }
}

...其他路由也类似,每个路由都有自己的clientId。

单于俊智
2023-03-14

只有一个客户端可以使用ClientID。它们必须是唯一的,而不是您可能想要手动设置的。另一种选择可能是设置ClientIDPrefix,以便获得更好的方法来识别哪个应用程序正在使用。

 类似资料:
  • 我是新手RabbitMQ java客户端。我的问题:我创建了10个consumer并将它们添加到队列中。每个消费者使用10秒来处理我的流程。我检查了Rabbit的页面,我看到我的队列有4000条消息没有发送到客户端。我检查了日志客户端,结果是为一个消费者获取一条消息,10秒后,我为一个消费者获取一条消息,依此类推…我想要得到10个消息为所有消费者在当时(10个消息-10消费者过程在当时)请帮助我,

  • 我正在运行同一个Spring Boot 2.0.4应用程序的多个实例,出于扩展目的,它们使用以下命令消耗来自ActiveMQ队列的消息:@JmsListener(目的地="myQ") 只有第一个消费者接收消息,如果我停止第一个消费者,第二个实例开始接收消息。我希望每个消费者以循环方式使用一条消息,而不是同一条消息。但只有第一消费者消费消息。

  • JMS队列有2个消费者,同步和异步Java应用程序进程等待响应。1)同步应用程序发送请求,并根据JMS相关ID等待响应60秒。2)异步线程将不断侦听同一队列。

  • 我有两个经纪人A和B。如果我想将消息从A转发给B,一切都很简单。我只需要代理中的网络连接器,如下所示: 如果我想从其他队列中使用来自代理 B 的消息,我会犹豫不决(让我们将其命名为 QUEUE。自。消费)我只需要做同样的事情,但双工设置为true,只需听队列。自。在代理 A 上消费,如下所示: 但是它不像我预期的那样工作。似乎每一秒钟只有一条信息被转发,其余的都丢失了。令人惊讶的是,这在代理B队列

  • 我使用Spring JMS和ActiveMQ,其中有一个客户机将消息推送到队列,有多个使用者线程监听并从队列中删除消息。有些时候,相同的消息会被两个使用者从队列中出列。我不希望这种行为,并希望确保仅有的一条消息由一个消费者线程处理。你知道我哪里出了问题吗? ActiveMQ 5.9.1配置:

  • ActiveMQ消息组是跨多个使用者进行负载平衡的一个非常好的特性。简而言之:消息流根据消息中嵌入的组标识符()在单个队列的多个使用者之间进行分区。(因此,使用者1将获得的所有消息,使用者2将获得的所有消息,依此类推) 现在,假设您有两个队列:和,并假设在流经这两个队列的消息中使用一致的S分类法。代理为on queue选择的使用者是否与代理为on queue选择的连接相同? 但是,我们能模拟这种行