我正在尝试将我的应用程序与JMS队列集成(使用ActiveMQ)。我使用Spring集成作为集成组件。我们希望有连池。已将'maxContopt消费者'作为100提供给'DefaultMessageListenerContainer'。
问题是,一旦从队列中读取了所有消息,“消费者数量”仍为100(如ActiveMq控制台上所示)。当我们在数据库中使用连接池(通过JNDI)时,一旦不再需要连接,它们就会返回到池中,并且打开的连接数量会减少,这在这里不会发生。
任何处理此问题的指针都将有很大帮助。
我的代码如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- Component scan to find all Spring components -->
<context:component-scan base-package="com.poc.springinteg._7" />
<!-- -->
<bean id="remoteJndiTemplate" class="org.springframework.jndi.JndiTemplate" lazy-init="false">
<property name="environment">
<props>
<prop key="java.naming.provider.url">tcp://localhost:61616</prop>
<prop key="java.naming.factory.url.pkgs">org.apache.activemq.jndi</prop>
<prop key="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</prop>
<prop key="connectionFactoryNames">DefaultActiveMQConnectionFactory,QueueConnectionFactory</prop>
<prop key="queue.SendReceiveQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/SendReceiveQueue</prop>
<prop key="queue.SendQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/MDBTransferBeanOutQueue</prop>
</props>
</property>
</bean>
<bean id="remoteConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" lazy-init="false">
<property name="jndiTemplate" ref="remoteJndiTemplate"/>
<property name="jndiName" value="QueueConnectionFactory"/>
<property name="lookupOnStartup" value="true" />
<property name="proxyInterface" value="javax.jms.ConnectionFactory" />
</bean>
<!-- writing queue -->
<bean id="destinationqueue"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0">
<value>OutputQueue_7</value>
</constructor-arg>
</bean>
<int:channel id="outbound"/>
<int-jms:outbound-channel-adapter id="jmsOut"
channel="outbound"
connection-factory="remoteConnectionFactory"
destination="destinationqueue" />
<!-- reading queue -->
<bean id="sourceQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0">
<value>OutputQueue_7</value>
</constructor-arg>
</bean>
<bean id="messageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="remoteConnectionFactory"/>
<property name="destination" ref="sourceQueue"/>
<property name="maxConcurrentConsumers" value="10"/>
<property name="concurrentConsumers" value="1"/>
<property name="autoStartup" value="true"/>
</bean>
<int:channel id="inbound"/>
<int-jms:message-driven-channel-adapter id="jmsIn"
channel="inbound"
extract-payload="false"
container="messageListenerContainer" />
<int:service-activator input-channel="inbound"
output-channel="outbound"
ref="messageReader"
method="onMessage" />
</beans>
-- Message Reader Class
import javax.jms.JMSException;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component("messageReader")
public class MessageReader
{
@ServiceActivator
public void onMessage(Message inboundMessage) {
System.out.println(" -------Message Read Start--------");
System.out.println(inboundMessage.getHeaders());
System.out.println(" -------Message Headers Reading completed--------");
System.out.println("payload-->" + inboundMessage.getPayload().getClass());
String payload = inboundMessage.getPayload().toString();
System.out.println("payload value-->" + payload);
org.apache.activemq.command.ActiveMQTextMessage obj = (org.apache.activemq.command.ActiveMQTextMessage)inboundMessage.getPayload();
System.out.println("Object-->" + obj);
String var = null;
try {
var = obj.getText();
System.out.println("Datastructure-->" + obj.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
---- Message Writer Class
@Component("sendMessage")
public class SendMessage {
@Autowired
private MessageChannel outbound;
public void send(String name)
{
Entity entity = new Entity(1,"anuj");
Message<Entity> message = MessageBuilder.withPayload(entity)
.setHeader("Message_Header1", "Message_Header1_Value")
.setHeader("Message_Header2", "Message_Header2_Value")
.build();
outbound.send(message);
}
}
-- Application main class
public class App {
public static void main( String[] args )
{
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:7_applicationContext.xml" );
SendMessage sendMessage = (SendMessage)applicationContext.getBean( "sendMessage", SendMessage.class);
for(int i=0;i<10;i++){
sendMessage.send("This is Message Content");
}
applicationContext.registerShutdownHook();
}
}
如果将连接工厂包装在一个CachingConnectionFactory
中,所有使用者都将共享一个连接。
消费者将由concurrentConsumers
和maxConcurrentConsumers
之间的容器根据需求进行调整;在一系列活动之后,消费者的数量将需要一段时间才能减少。
在我们的应用程序中,我们使用Spring与ActiveMQ集成。我们面临一个问题,例如每当ActiveMQ服务器关闭时,应用程序都会抛出: 你能建议如何让我的应用程序在ActiveMQ关闭的情况下运行吗 请在下面找到ActiveMQ配置:
如果发生异常,ActiveMQ broker不会相应地重新传递消息。 当我使用进行简单的集成测试时,JMS消息会被重新传递,即可以实现重试机制 如何用ActiveMQ为Tomcat实现同样的功能?
现在当我启动基于spring的应用程序时。我在调试器中看到,我的侦听器方法注册了,但是Spring不连接到ActiveMQ队列,我可以通过查看ActiveMQ web控制台轻松地看到这一点。此外,我没有看到任何来自spring的日志输出,而JMS的东西是这样做的。 我错过什么了吗?从这里走下去最好的路是什么?当然,我也测试了从ActiveMQ的web控制台中发送消息,但是如果没有客户端连接,就没有
Spring中的ActiveMQ插件可以将来自非ActiveMQ的数据集中起来吗? 谢谢,提前
我正在尝试连接到EAP 7.1上的ActiveMq Artemis,它具有传统配置(远程:4447)。我可以使用JMSToolBox通过端口5445连接,但是当我想从我的Spring Boot应用程序使用remote://xxx:4447访问服务器时,我得到了这个警告 对于目标“java:/队列/参与方”,JMS 消息侦听器调用程序的安装失败 - 尝试恢复。原因:无法将org.apache.act
我正在尝试从JMS队列(使用ActiveMQ)读取消息。我面临的问题是,消息正在从队列中读取,但没有显示在“服务激活器”中。 非常感谢您的帮助。 我的代码如下: (1) Spring配置 (2) 服务激活器MDP: (3) 申请开始课程: 谢谢