当前位置: 首页 > 知识库问答 >
问题:

如何从其他应用程序订阅Spring Boot JMS主题

岑俊明
2023-03-14

我有两个应用程序,第一个应用程序启动ActiveMQ代理(https://spring.io/guides/gs/messaging-jms/ ).

在第二个应用程序,我想从第一个应用程序的主题。

我如何在不启动ActiveMQ服务器的情况下执行此操作?

可能的解决办法

服务器应用程序项目

import java.time.LocalDateTime;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.ui.ModelMap;

@SpringBootApplication
@EnableJms
@EnableScheduling
public class JsmServerApplication {

    @Autowired
    JmsTemplate jmsTemplate;

    @Bean
    public BrokerService broker() throws Exception {
        BrokerService ret = new BrokerService();
        ret.addConnector("tcp://0.0.0.0:4444"); // allow remote connections
        ret.setBrokerName("primary-broker");
        ret.setUseJmx(true);
        return ret;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:4444");
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    public static void main(String[] args) {
        SpringApplication.run(JsmServerApplication.class, args);
    }

    @Scheduled(cron = "*/5 * * * * ?")
    public void run() {
        ModelMap msg = new ModelMap("now", LocalDateTime.now().toString());
        System.out.println("Sending: " + msg);
        jmsTemplate.convertAndSend("messages", msg);
    }

}

客户端应用程序项目

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.ui.ModelMap;

@SpringBootApplication
@EnableJms
public class JsmClientApplication {

    @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:4444");
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @JmsListener(destination = "messages", containerFactory = "jmsListenerContainerFactory")
    public void msg(ModelMap msg) {
        System.out.println(msg);
    }

    public static void main(String[] args) {
        SpringApplication.run(JsmClientApplication.class, args);
    }

}

这是正确的方法吗?

用这个解决了:

http://javasampleapproach.com/java-integration/activemq-work-spring-jms-activemq-topic-publisher-subcribers-pattern-using-springboot

共有2个答案

颜实
2023-03-14

有关如何在tcp端口而不是vm://传输端口上侦听的信息,请参阅此答案。

柳钟展
2023-03-14

您可以使用Message消费者来使用如下代码所示的数据

public static void main(String[] args) throws JMSException {
    // Getting JMS connection from the server
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
    Connection connection = connectionFactory.createConnection();
    connection.start();

    Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic("topic");
    MessageConsumer consumer = session.createConsumer(topic);

    MessageListener listner = new MessageListener() {
        public void onMessage(Message message) {
            try {
                //do operations
           } catch (JMSException e) {

           }
         }
    };

    consumer.setMessageListener(listner);
    connection.close();

}

由于您使用的是ActiveMQConnectionFactory,您可以设置如下代理

BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:4444);
broker.setPersistent(false);

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

如果您在不使用ActiveMQ方面没有任何限制,那么可以使用Kafka来执行相同的操作。Kafka通过简单的API为您提供了一个高度可扩展的分布式消息总线。

https://kafka.apache.org/quickstart

我不确定限制,但我只是想给你一种Kafka的感觉。然而,上面的代码应该有助于您理解订阅和使用主题消息的概念。

 类似资料:
  • 实现代码 namespace App; use EasySwoole\Core\Swoole\Process\AbstractProcess; use Swoole\Process; class Subscribe extends AbstractProcess { public function run(Process $process) { // TODO: I

  • 我试图开发一个android应用程序,可以擦除其他应用程序的缓存数据,我试图浏览所有的博客,但没有一个对我有效,我可以通过以下代码清除我的应用程序的缓存 我想清除其他应用程序的缓存,可以任何机构请帮助我,如果我错了请纠正我,提前谢谢。

  • 我有一个带有iframe元素的组件。我将src属性设置为原始url,用户可以从那里导航。我想订阅用户导航到的新位置,假设它们在同一个域上。在非angular解决方案中,他们描述了在iframe上放置onload事件,但是在angular中在iframe上放置(load)事件似乎只在初始化时触发一次,而不是像我希望的那样,每次在iframe内有导航事件时都触发一次。我做了一个非常丑陋的解决方案: 但

  • 问题内容: 我有一个Java程序,希望可以在计算机上的任何位置运行。我想从Cygwin命令提示符下运行它。我编写了脚本来调用Java程序。我将Java程序的位置添加到类路径中,并且当我从Java程序的目录运行它们时,脚本可以工作。但是,当我尝试从任何其他目录运行时,我得到: 这是我的脚本: 将Java行更改为以下内容: 产生相同的结果。 问题答案: 在尝试了所有我能想到的一切之后,我回显了该命令,

  • 问题内容: 我的本地glassfish服务器上运行了两个应用程序。一位租用比奇,另一位购买火车票。我现在想从火车应用程序中调用一个远程ejb,以允许在选定的时间内租用自行车,但是我不确定如何做到。我已经尝试了几种不同的方法。两种应用程序使用不同的软件包,火车应用程序必须知道自行车远程接口,但我不知道如何实现。 远程界面(bicyle应用): 我想调用的Bicycle EJB: 火车应用程序的开始:

  • 我有以下代码 我的问题:当我添加多个主题订阅时(即上面的A,B,C),Kstream代码停止接收记录。 参考文献:https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsBuilder.html 相关文件 我想实现的是:让一个Kstream(即上面的“源”)消耗/处理多个主题。