我有一个ActiveMQ服务器的实例,它运行着几个数据源,如果消息消费失败,它会将数据推送到两个队列中,一个是DLQ。我使用Apache Camel来消费和处理来自这些队列的消息,并想将其写入InfxDB。
然而,到目前为止,我未能让堆栈运行,以使Apache Camel并行消耗所有队列。我总是遇到这种错误:
ERROR 20831 --- [r[ActiveMQ.DLQ]]
c.c.j.DefaultJmsMessageListenerContainer : Could not refresh JMS
Connection for destination 'ActiveMQ.DLQ' - retrying using
FixedBackOff{interval=5000, currentAttempts=270,
maxAttempts=unlimited}. Cause: Broker: localhost - Client: Influx
Message Queue already connected from tcp://ip-of-machine-running-route:port
一个Apache Camel实例如何使用多个队列?
我尝试了两种方法:
目前,我的代码如下所示:
Camel配置
@Configuration
public class CamelConfig {
@Bean
public ShutdownStrategy shutdownStrategy() {
MessageLogger.logInfo(getClass(), "Camel Route: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
DefaultShutdownStrategy strategy = new DefaultShutdownStrategy();
int timeout = 1200;
MessageLogger.logInfo(getClass(), "Camel Route: Timeout for shutdown: " + timeout + " seconds.",
Thread.currentThread().getStackTrace()[0].getMethodName());
strategy.setTimeout(timeout); // TODO make it configurable
return strategy;
}
}
ActiveMQ客户端配置
@Configuration
public class ActiveMqClientConfig {
@Bean
public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://servername:61616");
connectionFactory.setUserName(username);
connectionFactory.setPassword(passwd);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setClientID("Influx Message Queue");
connectionFactory.setConnectResponseTimeout(300);
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
Thread.currentThread().getStackTrace()[0].getMethodName());
return connectionFactory;
}
}
流入配置
@Configuration
public class InfluxDBClientConfig {
@Bean
public InfluxDbOkHttpClientBuilderProvider registerInfluxDbOkHttpClientBuilderProvider() {
return () -> {
MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
Builder builder = new OkHttpClient.Builder() //
.readTimeout(1200, TimeUnit.SECONDS) //
.writeTimeout(1200, TimeUnit.SECONDS) //
.connectTimeout(1200, TimeUnit.SECONDS) //
.retryOnConnectionFailure(true);
MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTED - " + builder.toString(),
Thread.currentThread().getStackTrace()[0].getMethodName());
return builder;
};
}
}
具有多个路由的组件:
@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
@Autowired
private FrameworkConfig frameworkConfig;
@Override
public void configure() throws Exception {
String consumerQueueq = "activemq:queue:queue1?" //
+ "brokerURL=tcp://ip:port";
String consumerActiveMqDLQ = "activemq:queue:ActiveMQ.DLQ?" //
+ "brokerURL=tcp://ip:port";
String consumerQueue2 = "activemq:queue:queue2?" //
+ "brokerURL=tcp://ip:port";
String emitterInfluxDB = "influxdb://influxDb?databaseName=databaseName"
+ "&batch=true" //
+ "&retentionPolicy=retentionPolicy"
String emitterStreamOut = "stream:out";
//************************************************************************
// Data from cryring_db_inbound to InfluxDB
//************************************************************************
from(consumerCryringInbound) //
.process(messagePayload -> {
Message message = messagePayload.getIn();
if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
Processor.processMessage(message.getBody(String.class), message);
} else {
Processor.processMessage(message);
}
})//
.to(emitterInfluxDB) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.log("error") //
.to(emitterStreamOut);
//************************************************************************
// Data from cryring_db_inbound to InfluxDB
//************************************************************************
from(consumerActiveMqDLQ) //
.process(messagePayload -> {
Message message = messagePayload.getIn();
if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
Processor.processMessage(message.getBody(String.class), message);
} else {
Processor.processMessage(message);
}
})//
.to(emitterInfluxDB) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.log("error") //
.to(emitterStreamOut);
//************************************************************************
// Data from olog_inbound to olog
//************************************************************************
from(consumerOlog) //
.process(messagePayload -> {
System.out.println(messagePayload.getIn());
}) //
.to(emitterStreamOut);
}
}
与 Camel 多个消费者实现问题建议的(一个@Component中有多个 from().to() 路由)相反,我设法通过将路由拆分为单独的组件来完成这项工作,每个组件都有一个单独的 clientId。此外,我用UUID替换了ActiveMQ配置中的静态客户端ID。法典:
ActiveMQ-Config:
@Configuration
public class ActiveMqClientConfig {
@Bean
public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://servername:port");
connectionFactory.setUserName(username);
connectionFactory.setPassword(password);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setClientID(UUID.randomUUID().toString());
connectionFactory.setConnectResponseTimeout(300);
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
Thread.currentThread().getStackTrace()[0].getMethodName());
return connectionFactory;
}
}
路由组件:
@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
@Autowired
private FrameworkConfig frameworkConfig;
@Override
public void configure() throws Exception {
String consumerCryringInbound = "activemq:queue:queue1?"
+ "brokerURL=tcp://activemq-server-ip:port"
+ "clientId=clientid1";
String emitterInfluxDB = "influxdb://influxDb?databaseName=influx_db_name"
+ "&batch=true"
+ "&retentionPolicy=retentionPolicy";
String emitterStreamOut = "stream:out";
//************************************************************************
// Data from cryring_db_inbound to InfluxDB
//************************************************************************
from(consumerCryringInbound)
.process(processor code)
.to(emitterInfluxDB)
.onException(Exception.class)
.useOriginalMessage()
.handled(true)
.log("error")
.to(emitterStreamOut);
}
}
...其他路由也类似,每个路由都有自己的clientId。
只有一个客户端可以使用ClientID
。它们必须是唯一的,而不是您可能想要手动设置的。另一种选择可能是设置ClientIDPrefix
,以便获得更好的方法来识别哪个应用程序正在使用。
我是新手RabbitMQ java客户端。我的问题:我创建了10个consumer并将它们添加到队列中。每个消费者使用10秒来处理我的流程。我检查了Rabbit的页面,我看到我的队列有4000条消息没有发送到客户端。我检查了日志客户端,结果是为一个消费者获取一条消息,10秒后,我为一个消费者获取一条消息,依此类推…我想要得到10个消息为所有消费者在当时(10个消息-10消费者过程在当时)请帮助我,
我正在运行同一个Spring Boot 2.0.4应用程序的多个实例,出于扩展目的,它们使用以下命令消耗来自ActiveMQ队列的消息:@JmsListener(目的地="myQ") 只有第一个消费者接收消息,如果我停止第一个消费者,第二个实例开始接收消息。我希望每个消费者以循环方式使用一条消息,而不是同一条消息。但只有第一消费者消费消息。
JMS队列有2个消费者,同步和异步Java应用程序进程等待响应。1)同步应用程序发送请求,并根据JMS相关ID等待响应60秒。2)异步线程将不断侦听同一队列。
我有两个经纪人A和B。如果我想将消息从A转发给B,一切都很简单。我只需要代理中的网络连接器,如下所示: 如果我想从其他队列中使用来自代理 B 的消息,我会犹豫不决(让我们将其命名为 QUEUE。自。消费)我只需要做同样的事情,但双工设置为true,只需听队列。自。在代理 A 上消费,如下所示: 但是它不像我预期的那样工作。似乎每一秒钟只有一条信息被转发,其余的都丢失了。令人惊讶的是,这在代理B队列
我使用Spring JMS和ActiveMQ,其中有一个客户机将消息推送到队列,有多个使用者线程监听并从队列中删除消息。有些时候,相同的消息会被两个使用者从队列中出列。我不希望这种行为,并希望确保仅有的一条消息由一个消费者线程处理。你知道我哪里出了问题吗? ActiveMQ 5.9.1配置:
ActiveMQ消息组是跨多个使用者进行负载平衡的一个非常好的特性。简而言之:消息流根据消息中嵌入的组标识符()在单个队列的多个使用者之间进行分区。(因此,使用者1将获得的所有消息,使用者2将获得的所有消息,依此类推) 现在,假设您有两个队列:和,并假设在流经这两个队列的消息中使用一致的S分类法。代理为on queue选择的使用者是否与代理为on queue选择的连接相同? 但是,我们能模拟这种行