当前位置: 首页 > 知识库问答 >
问题:

Spring Boot嵌入式HornetQ集群不转发消息

燕永昌
2023-03-14
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
<dependency>
    <groupId>org.hornetq</groupId>
    <artifactId>hornetq-jms-server</artifactId>
</dependency>
@SpringBootApplication
@EnableScheduling
public class DemoHornetqServerSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Server: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverReceiveConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverSendConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}
spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password

DemohornetQServerReceiveApplication:

@SpringBootApplication
@EnableJms
public class DemoHornetqServerReceiveApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerReceiveApplication.class, args);
    }

    @JmsListener(destination="${spring.hornetq.embedded.queues}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverSendConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverReceiveConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}

Application.Properties(ServerReceive)

spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password

启动这两个应用程序后,日志输出显示如下:

2015-04-09 11:11:58.471  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users\****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)  
2015-04-09 11:11:58.501  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221045: libaio is not available, switching the configuration into NIO  
2015-04-09 11:11:58.595  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221043: Adding protocol support CORE  
2015-04-09 11:11:58.720  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221003: trying to deploy queue jms.queue.jms.testqueue  
2015-04-09 11:11:59.568  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5445  
2015-04-09 11:11:59.593  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221007: Server is now live  
2015-04-09 11:11:59.593  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124)   [c139929d-d90f-11e4-ba2e-e58abf5d6944] 
2015-04-09 11:12:04.401  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users\****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)  
2015-04-09 11:12:04.410  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221045: libaio is not available, switching the configuration into NIO  
2015-04-09 11:12:04.520  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221043: Adding protocol support CORE  
2015-04-09 11:12:04.629  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221003: trying to deploy queue jms.queue.jms.testqueue  
2015-04-09 11:12:05.545  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5446  
2015-04-09 11:12:05.578  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221007: Server is now live  
2015-04-09 11:12:05.578  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124)   [c139929d-d90f-11e4-ba2e-e58abf5d6944] 
Sending message: Timestamp from Server: 1428574324910  
Sending message: Timestamp from Server: 1428574329899  
Sending message: Timestamp from Server: 1428574334904  

消息似乎没有从ServerEnd转发到ServerReceive。

我做了更多的测试,创建了另外两个Spring Boot应用程序(ClientSend和ClientReceive),它们没有嵌入HornetQ服务器,而是连接到“本地”服务器。

用于两个客户端应用程序的pom.xml包含:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
@SpringBootApplication
@EnableScheduling
public class DemoHornetqClientSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${queue}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqClientSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Client: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }
}
spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5446

queue=jms.testqueue
@SpringBootApplication
@EnableJms
public class DemoHornetqClientReceiveApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${queue}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqClientReceiveApplication.class, args);
    }

    @JmsListener(destination="${queue}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}
spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5445

queue=jms.testqueue
Received message: Timestamp from Client: 1428574966630  
Received message: Timestamp from Client: 1428574971600  
Received message: Timestamp from Client: 1428574976595  
Received message: Timestamp from Server: 1428574969436  
Received message: Timestamp from Server: 1428574974438  
Received message: Timestamp from Server: 1428574979446  

如果我让serversend运行一段时间,然后启动clientreceive,它还会接收到该点之前排队的所有消息,因此这表明消息不会在某个地方消失,或者从其他地方被消耗。

为了完整起见,我还将ClientSend指向ServersEndClientReceive,并将ClientReceive指向ServerReceive,以查看集群和InVM客户机是否存在问题,但也没有显示在ClientReceiveServerReceive中接收到任何消息。

因此,到/从每个嵌入代理到直接连接的外部客户机的消息传递似乎工作正常,但集群中的代理之间没有消息转发。

所以,在所有这些之后,最大的问题是,消息不在集群中转发的设置有什么问题?

共有1个答案

阳枫涟
2023-03-14

http://docs.jboss.org/hornetq/2.2.5.final/user-manual/en/html/architecture.html#D0E595

“HornetQ core被设计为一组简单的POJO,因此,如果您有一个内部需要消息传递功能的应用程序,但您不想将其公开为HornetQ服务器,您可以直接在自己的应用程序中实例化和嵌入HornetQ服务器。”

如果嵌入它,就不会将其作为服务器公开。您的每个容器都有一个单独的实例。它相当于启动2个hornet副本,并赋予它们相同的队列名称。一个在第一个实例上写入该队列,另一个在第二个实例上监听该队列。

 类似资料:
  • 我将使用嵌入在 JBoss EAP 6.2 中的 HornetQ 2.3.12,并且需要一些集群队列。 我是否需要设置 JBoss 集群才能让 JMS 集群由大黄蜂 Q 提供支持,或者大黄蜂 Q 是独立的?根据文档,我认为是后者,因为大黄蜂Q集群是大黄蜂Q的一部分,可以在没有JBoss的情况下存在。 节点通过核心网桥连接,因此部署在每个节点中的应用程序将对队列名称执行本地 JNDI 查找,而无需集

  • 问题内容: 对于这两个消息传递系统中的哪个是我,我将不胜感激。 更容易管理 需要了解和避免的陷阱或魔术少 具有较少的总体依赖 很简单。 问题答案: 更容易管理 hornetQ具有清晰的管理API,并且非常易于使用。 需要了解和避免的陷阱或魔术少 hornetQ专为嵌入式案例设计。将其集成到代码中非常非常容易。实际上,您可以用少于10行的代码来完成此操作。 具有较少的整体依赖性 HornetQ上的所

  • 我有时会在pom中看到以下声明。xml。。。 如您所见,sping-boo-starter-web被声明为tomcat-embed-jasper。 是不是sping-boo-starter-web已经有一个嵌入式tomcat了?为什么一些开发人员仍然声明tomcat-embed-jasper以及boot-starter-web?还是有什么原因?

  • 在学习教程的同时,我正在使用SPRING初始化器https://start.spring.io/使用springboot 2.0.2生成带有reactiveMongoDB的项目。 gradle文件列出:compile('org.springframework.boot:spring-boot-starter-data-mongodb-reactive') 我能够将该项目导入eclipse,主类使用

  • 18:38:14,391警告[org.hornetq.core.server.cluster.impl.clusterConnectionImpl](Thread-4(HornetQ-client-global-threads-20937207))MessageFlow RecordImpl[nodeid=3AFB4E60-Ed20-11E1-831C-109add44C09e,connector

  • 我使用的是一个JUnit测试用例中的嵌入式HornetQ实例。 Q221007:服务器现在处于活动状态[FF][ScalaTest-run][2014-06-11 15:03:03,555信息]HornetQServerImpl.java:460-HQ221001:HornetQ服务器版本2.5.0.快照(野生大黄蜂,124)[ea2511b0-e5c6-11e3-a213-b1fcc2ec926