当前位置: 首页 > 知识库问答 >
问题:

Spring中使用JMS模板/消息订阅服务器的JMS主题订阅服务器

卜昂熙
2023-03-14

我有一个使用ActiveMQ的JMS生产者/订阅者的简单Spring应用程序,配置如下:

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616" />
    <property name="userName" value="user" />
    <property name="password" value="password" />
</bean>
<bean id="messageDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="messageQueue1" />
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE">
    </property>
</bean>

<bean id="springJmsProducer" class="SpringJmsProducer">
    <property name="destination" ref="messageDestination" />
    <property name="jmsTemplate" ref="jmsTemplate" />
</bean>

<bean id="springJmsConsumer" class="SpringJmsConsumer">
    <property name="destination" ref="messageDestination" />
    <property name="jmsTemplate" ref="jmsTemplate" />
</bean>
public class SpringJmsProducer {
private JmsTemplate jmsTemplate;
private Destination destination;

public JmsTemplate getJmsTemplate() {
    return jmsTemplate;
}

public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

public Destination getDestination() {
    return destination;
}

public void setDestination(Destination destination) {
    this.destination = destination;
}

public void sendMessage(final String msg) {
    jmsTemplate.send(destination, new MessageCreator() {
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(msg);
        }});        
 }
}
public class SpringJmsConsumer {
private JmsTemplate jmsTemplate;
private Destination destination;

public JmsTemplate getJmsTemplate() {
    return jmsTemplate;
}

public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

public Destination getDestination() {
    return destination;
}

public void setDestination(Destination destination) {
    this.destination = destination;
}

public String receiveMessage() throws JMSException {
    TextMessage textMessage =(TextMessage) jmsTemplate.receive(destination);        
    return textMessage.getText();
 }
}

我试过所有可能的解决办法,但没有一个奏效。我们非常感谢任何帮助

共有1个答案

董俊
2023-03-14

如果您希望消费者在开始之前收到发送到主题的消息,您有2个选择:

1.使用Activemq追溯使用者

背景一个追溯使用者只是一个常规的JMS主题使用者,它指示在订阅开始时,每一次尝试都应该用来回到过去,并发送使用者可能错过的任何旧消息(或上一次在该主题上发送的消息)。

topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true");

2.使用持久订户:

请注意,持久订阅者在第二次运行开始之前接收发送到主题的消息

http://activemq.apache.org/manage-production-subscribers.html

这可以异步使用DefaultMessageListenerContainer

<bean id="jmsContainer" destroy-method="shutdown"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="messageDestination" />
    <property name="messageListener" ref="messageListenerAdapter" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
    <property name="subscriptionDurable" value="true" />
    <property name="clientId" value="UniqueClientId" />
</bean>

<bean id="messageListenerAdapter"
    class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg ref="springJmsConsumer" />
</bean>
<bean id="springJmsConsumer" class="SpringJmsConsumer">
</bean>

并更新您的使用者:

public class SpringJmsConsumer implements javax.jms.MessageListener {

    public void onMessage(javax.jms.Message message) {
        // treat message;
        message.acknowledge();
    }
}

更新以使用

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

public class SpringJmsConsumer {

    private Connection conn;
    private TopicSubscriber topicSubscriber;

    public SpringJmsConsumer(ConnectionFactory connectionFactory, Topic destination ) {
        conn = connectionFactory.createConnection("user", "password");
        Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        topicSubscriber = session.createDurableSubscriber(destination, "UniqueClientId");
        conn.start();
    }

    public String receiveMessage() throws JMSException {
        TextMessage textMessage = (TextMessage) topicSubscriber.receive();
        return textMessage.getText();
    }
}

并更新springJmsConsumer

<bean id="springJmsConsumer" class="SpringJmsConsumer">
    <constructor-arg ref="connectionFactory" />
    <constructor-arg ref="messageDestination" />
</bean>

请注意,此代码不管理连接故障。

 类似资料:
  • 我试图利用固有的WSO2ESB主题发布到jms队列。我已经创建了主题,并提供了一个订阅者URL:jms:/topictest?transport.jms.destinationtype=queue。然而,当我将消息发布到主题时,它不能被传递到队列。日志生成以下内容 “系统无法从jms:/queue?destination=topictest URL推断传输信息。” 另外,我似乎不知道如何发布到WS

  • 我正在试验消息驱动Beans,以便从外部ActiveMQ实例接收主题订阅消息。 我的测试首先从队列订阅开始,它工作得很好。 然后我想尝试主题订阅,但我无法让它工作。 这就是我所拥有的: 会议记录。xml 这是MDB: 我不知道为什么,但从日志中我可以看到,TomEE创建了一个队列,而不是一个主题: 另一个证明是,当我添加持续时间配置时,服务器不会启动: 然后服务器抱怨这不适合配置类型javax.j

  • 有可能做到这一点吗?

  • 目前,我已经开始使用ActiveMQ处理JMS主题。我已经通过JAVA代码(如下所述)创建了发布者和持久订阅者,并且在订阅者端也收到了消息。 Publisher.Java 订阅者.java 我对以下主题有一些疑问, 如何检查有多少订阅者使用 Java JMS 在主题中主动查找消息? 如何从主题中获取活动和持久订阅者列表? 我们是否可以删除主题中发布的消息? 在这些情况下帮助我。 提前致谢。

  • 我基本上需要从python服务器向设备发送命令,设备将发布对主题的回复,我需要捕获回复服务器端。要从服务器发布到设备,我正在使用boto3物联网数据模块。但是我如何订阅另一个主题以从设备获得回复?似乎没有办法使用aws python库。我需要使用像paho这样的遗传MQTT客户机吗? 谢谢你。

  • 我正在使用Java和Qpid JMS 0.23测试发布/订阅。 我在SB中创建了一个名为“测试主题”的主题。 我可以从测试应用程序向主题发布消息,但在尝试订阅(动态创建订阅)时失败,例外情况: javax.jms.InvalidDestinationException:找不到消息传递实体mynamesspace:主题:test.topic~15|DurableSubscriber2。Trackin