当前位置: 首页 > 知识库问答 >
问题:

无法使用ActiveMQ同时向两个订户发布主题消息

裴俊迈
2023-03-14

我提到了SpringBoot应用程序发布和读取ActiveMQ主题,使用ActiveMQ发布一个主题。我已经创建了两个从主题中读取消息的接收器微服务。我还创建了rest endpoint来发布主题。但是,我必须执行这个restendpoint两次,以便为两个接收者发布消息。restendpoint的第一次执行将向Receiver1发送消息。restendpoint的第二次执行将向Receiver2发送消息

因此,两个接收器不能同时读取主题
这是我的代码。

PublisherApplication.java

package com.springboot;

//import statements

@SpringBootApplication
@EnableDiscoveryClient
@EnableJms
public class PublisherApplication {

    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();      
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        //setPubSubDomain identifies Topic in ActiveMQ
        factory.setPubSubDomain(true);
        return factory;
    }


    public static void main(String[] args) {  
        SpringApplication.run(PublisherApplication.class, args);
    }

    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build();

    }

}

发布消息.java
[发布主题的Restendpoint]

package com.springboot.controller;

//import statements

@RestController
@RequestMapping(path = "/schoolDashboard/topic")
class PublishMessage {

    public static final String MAILBOX_TOPIC = "mailbox.topic";

    @Autowired
    private JmsTemplate jmsTemplate;

    @GetMapping(path = "/sendEmail")
    public void sendStudentById() throws Exception{
        System.out.println("Anindya-TopicSendMessage.java :: Publishing Email sent....");
        jmsTemplate.convertAndSend(MAILBOX_TOPIC, "Topic - Email Sent");
    }

}

接收器应用程序01
[注意 - 接收器01是第一个微服务]

package com.springboot;

//import statements

@SpringBootApplication
@EnableJms
public class ReceiverApplication01 {


    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();      
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        //setPubSubDomain identifies Topic in ActiveMQ
        factory.setPubSubDomain(true);
        return factory;
    }

    public static void main(String[] args) {
        SpringApplication.run(ReceiverApplication01.class, args);
    }

}

TopicMesssgeReceiver01.java
[Receiver01从主题读取消息]

package com.springboot.message;

//import statement

@Component
public class TopicMesssgeReceiver01 {

    private final SimpleMessageConverter converter = new SimpleMessageConverter();

    public static final String MAILBOX_TOPIC = "mailbox.topic";

    @JmsListener(destination = MAILBOX_TOPIC, containerFactory = "topicListenerFactory")
    public void receiveMessage(final Message message) throws JMSException{
        System.out.println("Receiver01 <" + String.valueOf(this.converter.fromMessage(message)) + ">");
    }

}

ReceiverApplication 02
[注意:-Receiver02是第二个微服务]

package com.springboot;

//import statement

@SpringBootApplication
@EnableJms
public class ReaderApplication02 {

    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        configurer.configure(factory, connectionFactory);       
        return factory;
    }

    public static void main(String[] args) {
        SpringApplication.run(ReaderApplication02.class, args);
    }

}

主题消息接收者02
[接收者02 从主题读取消息]


package com.springboot.message;

//import statement

@Component
public class TopicMesssgeReceiver02 {


private final SimpleMessageConverter converter = new SimpleMessageConverter();

    public static final String MAILBOX_TOPIC = "mailbox.topic";

    @JmsListener(destination = MAILBOX_TOPIC, containerFactory = "topicListenerFactory")
    public void receiveMessage(final Message message) throws Exception{
        System.out.println("Receiver02 <" + String.valueOf(this.converter.fromMessage(message)) + ">");
    }

}

共有1个答案

叶越
2023-03-14

谢谢你纳文。!最后,我能够做到这一点。< br >我们必须只设置setubsubdomain(true);spring-boot会处理所有锅炉板代码。< br >现在,两个接收器微服务可以同时从主题中读取消息< br >以下是代码更改

发布消息.java
[发布主题的Restendpoint]

package com.springboot.controller;

//import statements

@RestController
@RequestMapping(path = "/schoolDashboard/topic")
class PublishMessage {

    public static final String MAILBOX_TOPIC = "mailbox.topic";

    @Autowired
    private JmsTemplate jmsTemplate;

    @GetMapping(path = "/sendEmail")
    public void sendStudentById() throws Exception{
        System.out.println("Publisher :: Message sent...");
        /* Added this statement. setPubSubDomain(true) identifies Topic in ActiveMQ */
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.convertAndSend(MAILBOX_TOPIC, "Topic - Email Sent");
    }

}

ReceiverApplication 02
[注意:-Receiver02是第二个微服务]

package com.springboot;

//import statement

@SpringBootApplication
@EnableJms
public class ReaderApplication02 {

    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();       
        configurer.configure(factory, connectionFactory);    
        /* setPubSubDomain(true) should be placed after 
         * configuration of the specified jms listener container factory*/
        factory.setPubSubDomain(true);
        return factory;
    }

    public static void main(String[] args) {
        SpringApplication.run(ReaderApplication02.class, args);
    }

}

 类似资料:
  • 我正在使用kafka java客户端和kafka服务器。 我的代码: Kafka马纳格 当我的循环长度如果在1000左右(在类)时,我就能成功地向Kafka主题发送数据。 但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。 根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。 但是如

  • 我试图创建基于@JmsListener注释的发布-订阅示例:https://github.com/lkrnac/book-eiws-code-samples/tree/master/05-jms/0515-publish-subscribe 相关代码片段: 问题是,要获得这种行为: 但每个消息都应该由两个侦听器根据主题的定义来使用。我错过了什么?

  • 在我们的业务需求中,我们需要将更新传输到分布在全国各地的数千个客户端。问题是,许多这些客户端使用3G网络连接到我们,因此,发生了许多连接/断开连接...我们需要提供的更新是诸如“企业A不能再兑现”或“企业B能够再次兑现”之类的东西,我们正在考虑使用ActiveMQ持久主题来提供这些更新。我的理解是,一旦客户端连接到持久主题,即使他断开连接,每当他回来时,他都会在脱机时收到发送到该主题的消息。最大的

  • 我有一个websocket服务器和一个websocket客户端,都是Java的。websocket服务器具有以下功能: 在 Java 网页滑板客户端中,我在我的踩踏会话处理程序中提供了以下内容: 然后,我能够通过客户端向服务器路径“hello”发送消息来在两者之间进行通信,然后由于客户端订阅了“topic/greetings”,所以我也要用我的stompFrameHandler来处理响应。 但是我

  • 我是ActiveMQ新手。我曾尝试在activemq中实现生产者-消费者(发送者-接收器)。在我的代码中,我很容易发送 这是我的制片人 MsgProducer。Java语言 MsgConsumer.java 有谁能帮我找出向多个消费者发送信息的方法吗。提前谢谢。