当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ Artemis的前缀是“jms.topic”在Spring Boot Client上定义的所有主题名称

刘骏祥
2023-03-14

我在Spring Boot客户机上使用了spring-boot-starter-artemis依赖项的Artemis2.18.0和2.5.5版本。在我的相关用例中,客户机需要通过主题相互通信。问题是字符串jms.topic.在客户机上定义的每个主题都有前缀。例如,主题foo.sendinfo变为jms.topic.foo.sendinfo

xml文件如下所示。Spring Boot客户端使用的接受者是61617端口上的netty-ssl-acceptor

<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

  <name>0.0.0.0</name>


  <persistence-enabled>true</persistence-enabled>

  <!-- this could be ASYNCIO, MAPPED, NIO
       ASYNCIO: Linux Libaio
       MAPPED: mmap files
       NIO: Plain Java Files
   -->
  <journal-type>NIO</journal-type>

  <paging-directory>data/paging</paging-directory>

  <bindings-directory>data/bindings</bindings-directory>

  <journal-directory>data/journal</journal-directory>

  <large-messages-directory>data/large-messages</large-messages-directory>

  <!--
  if you want to retain your journal uncomment this following configuration.

  This will allow your system to keep 7 days of your data, up to 10G. Tweak it accordingly to your use case and capacity.

  it is recommended to use a separate storage unit from the journal for performance considerations.

  <journal-retention-directory period="7" unit="DAYS" storage-limit="10G">data/retention</journal-retention-directory>>
  -->

  <journal-datasync>true</journal-datasync>

  <journal-min-files>2</journal-min-files>

  <journal-pool-files>10</journal-pool-files>

  <journal-device-block-size>4096</journal-device-block-size>

  <journal-file-size>10M</journal-file-size>
  
  <!--
   This value was determined through a calculation.
   Your system could perform 1.18 writes per millisecond
   on the current journal configuration.
   That translates as a sync write every 844000 nanoseconds.

   Note: If you specify 0 the system will perform writes directly to the disk.
         We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
  -->
  <journal-buffer-timeout>844000</journal-buffer-timeout>


  <!--
    When using ASYNCIO, this will determine the writing queue depth for libaio.
   -->
  <journal-max-io>1</journal-max-io>
  <!--
    You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
     <network-check-NIC>theNicName</network-check-NIC>
    -->

  <!--
    Use this to use an HTTP server to validate the network
     <network-check-URL-list>http://www.apache.org</network-check-URL-list> -->

  <!-- <network-check-period>10000</network-check-period> -->
  <!-- <network-check-timeout>1000</network-check-timeout> -->

  <!-- this is a comma separated list, no spaces, just DNS or IPs
       it should accept IPV6

       Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
                Using IPs that could eventually disappear or be partially visible may defeat the purpose.
                You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
  <!-- <network-check-list>10.0.0.1</network-check-list> -->

  <!-- use this to customize the ping used for ipv4 addresses -->
  <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->

  <!-- use this to customize the ping used for ipv6 addresses -->
  <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->




  <!-- how often we are looking for how many bytes are being used on the disk in ms -->
  <disk-scan-period>5000</disk-scan-period>

  <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
       that won't support flow control. -->
  <max-disk-usage>90</max-disk-usage>

  <!-- should the broker detect dead locks and other issues -->
  <critical-analyzer>true</critical-analyzer>

  <critical-analyzer-timeout>120000</critical-analyzer-timeout>

  <critical-analyzer-check-period>60000</critical-analyzer-check-period>

  <critical-analyzer-policy>HALT</critical-analyzer-policy>

  
  <page-sync-timeout>844000</page-sync-timeout>


        <!-- the system will enter into page mode once you hit this limit.
       This is an estimate in bytes of how much the messages are using in memory

        The system will use half of the available memory (-Xmx) by default for the global-max-size.
        You may specify a different value here if you need to customize it to your needs.

        <global-max-size>100Mb</global-max-size>

  -->

  <acceptors>

     <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
     <!-- amqpCredits: The number of credits sent to AMQP producers -->
     <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
     <!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
                                  as duplicate detection requires applicationProperties to be parsed on the server. -->
     <!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
                                   default: 102400, -1 would mean to disable large mesasge control -->

     <!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
                "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
                See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->


     <!-- Acceptor for every supported protocol -->
     <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>

     <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
     <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>

     <!-- STOMP Acceptor. -->
     <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

     <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
     <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

     <!-- MQTT Acceptor -->
     <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=false</acceptor>
     
     <!-- SSL Acceptor -->
    <acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;anycastPrefix=jms.queue;multicastPrefix=jms.topic.;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>

    <acceptor name ="mqtt+ssl">tcp://0.0.0.0:8883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;needClientAuth=true;protocols=MQTT;useEpoll=true</acceptor>

  </acceptors>


  <security-settings>
     <security-setting match="#">
        <permission type="createNonDurableQueue" roles="admins, users"/>
        <permission type="deleteNonDurableQueue" roles="admins, users"/>
        <permission type="createDurableQueue" roles="admins, users"/>
        <permission type="deleteDurableQueue" roles="admins, users"/>
        <permission type="createAddress" roles="admins, users"/>
        <permission type="deleteAddress" roles="admins, users"/>
        <permission type="consume" roles="admins, users"/>
        <permission type="browse" roles="admins, users"/>
        <permission type="send" roles="admins, users"/>
        <!-- we need this otherwise ./artemis data imp wouldn't work -->
        <permission type="manage" roles="admins"/>
     </security-setting>
  </security-settings>

  <address-settings>
     <!-- if you define auto-create on certain queues, management has to be auto-create -->
     <address-setting match="activemq.management#">
        <dead-letter-address>DLQ</dead-letter-address>
        <expiry-address>ExpiryQueue</expiry-address>
        <redelivery-delay>0</redelivery-delay>
        <!-- with -1 only the global-max-size is in use for limiting -->
        <max-size-bytes>-1</max-size-bytes>
        <message-counter-history-day-limit>10</message-counter-history-day-limit>
        <address-full-policy>PAGE</address-full-policy>
        <auto-create-queues>true</auto-create-queues>
        <auto-create-addresses>true</auto-create-addresses>
        <auto-create-jms-queues>true</auto-create-jms-queues>
        <auto-create-jms-topics>true</auto-create-jms-topics>
     </address-setting>
     <!--default for catch all-->
     <address-setting match="#">
        <dead-letter-address>DLQ</dead-letter-address>
        <expiry-address>ExpiryQueue</expiry-address>
        <redelivery-delay>0</redelivery-delay>
        <!-- with -1 only the global-max-size is in use for limiting -->
        <max-size-bytes>-1</max-size-bytes>
        <message-counter-history-day-limit>10</message-counter-history-day-limit>
        <address-full-policy>PAGE</address-full-policy>
        <auto-create-queues>true</auto-create-queues>
        <auto-create-addresses>true</auto-create-addresses>
        <auto-create-jms-queues>true</auto-create-jms-queues>
        <auto-create-jms-topics>true</auto-create-jms-topics>
        <auto-delete-queues>false</auto-delete-queues>
        <auto-delete-addresses>false</auto-delete-addresses>
     </address-setting>
  </address-settings>

  <addresses>
     <address name="DLQ">
        <anycast>
           <queue name="DLQ" />
        </anycast>
     </address>
     <address name="ExpiryQueue">
        <anycast>
           <queue name="ExpiryQueue" />
        </anycast>
     </address>

  </addresses>


  <!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
  <broker-plugins>
     <broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
        <property key="LOG_ALL_EVENTS" value="true"/>
        <property key="LOG_CONNECTION_EVENTS" value="true"/>
        <property key="LOG_SESSION_EVENTS" value="true"/>
        <property key="LOG_CONSUMER_EVENTS" value="true"/>
        <property key="LOG_DELIVERING_EVENTS" value="true"/>
        <property key="LOG_SENDING_EVENTS" value="true"/>
        <property key="LOG_INTERNAL_EVENTS" value="true"/>
     </broker-plugin>
  </broker-plugins>
  -->

Spring Boot客户端上的连接工厂配置如下所示。

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;


@Configuration
@EnableJms
public class MQTTConfig {

@Value("${JMS_BROKER_TRUSTSTORE}")
private String pathToTrustStore;

@Value("${JMS_BROKER_KEYSTORE}")
private String pathToKeystore;

@Value("${JMS_BROKER_TRUSTSTORE_PASSWORD}")
private String truststorePassword;

@Value("${JMS_BROKER_KEYSTORE_PASSWORD}")
private String keystorePassword;



@Bean
public ActiveMQConnectionFactory artemisSSLConnectionFactory() {
    ActiveMQConnectionFactory artemisConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61617?&" + "sslEnabled=true&" +
            "trustStorePath=" + pathToTrustStore + "&trustStorePassword=changeit");
    artemisConnectionFactory.setUser("user");
    artemisConnectionFactory.setPassword("password");
    return artemisConnectionFactory;
}

/**
 * Initialise {@link JmsTemplate} as required
 */
@Bean
public JmsTemplate jmsTemplate() throws JMSException {
    JmsTemplate jmsTemplate = new JmsTemplate();
    jmsTemplate.setConnectionFactory(artemisSSLConnectionFactory());
    jmsTemplate.setExplicitQosEnabled(true);

    //setting PuSubDomain to true configures JmsTemplate to work with topics instead of queues
    jmsTemplate.setPubSubDomain(true);
    return jmsTemplate;
}

/**
 * Initialise {@link DefaultJmsListenerContainerFactory} as required
 */
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws JMSException {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(artemisSSLConnectionFactory());
    //setting PuSubDomain to true configures the DefaultJmsListenerContainerFactory to work with topics instead of queues
    factory.setPubSubDomain(true);
    return factory;
}

}

下面是POM文件,只有相关的依赖项。

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.4.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-artemis</artifactId>
        <version>2.5.5</version>
</dependency>
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public void samplePC() {

@Autowired
private JMSTemplate jmsTemplate;

//producer that is called by a cron job
public void tester() {
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("serialNumber", "105");
    jmsTemplate.convertAndSend("server/forecast", jsonObject.toString().toCharArray());
}

//consumer (a message from the producer should be received here, but nothing arrives)
@JmsListener(destination = "server/forecast")
private void consumeWeatherForecastRequest(char[] incomingMessage) {
    //some logic
    jmsTemplate.convertAndSend("someTopic", "someMessage");
}

}

共有1个答案

徐鸿达
2023-03-14

您已经定义了“anycastprefix=jms.queue;multicastprefix=jms.topic”。在您的SSL接受器中。你应该把它们移除。另一个解决方案是在连接工厂上将EnableAmQ1Prefix设置为false(但我认为这是默认值)

 类似资料:
  • 问题内容: 除了包级别注释外,还有其他方法可以使用注释来控制自定义名称空间前缀。 可以在元素级别完成吗?也可能有一个带有多个前缀的名称空间吗? 问题答案: 您实际上想做什么?为什么名称空间前缀对您很重要? 对于命名空间前缀,没有标准的元素级注释。 我知道的控制名称空间前缀的选项是: 你已经提到过。 提供习俗。 XML的低级处理(例如,在StAX,SAX或DOM级别)。 我也可以想象: / 通常使用

  • 我想知道是否有一种方法来强制JAXB创建与XSD模式相同的名称空间前缀。即,即使我从一个包含xmlns:cts=“http://cts.com”的模式创建JAXB类,在封送类之后,我得到一个具有xmlns:ns1前缀的XML。我知道我可以通过使用NamespacePrefixMapper类来重写这些,但是为什么我需要在我的XSD明确定义了默认前缀的情况下手动执行此操作呢?在我当前的XML中有大量的

  • S3前缀的定义到底是什么。 假设我有以下S3结构: sample.jpg?的前缀是什么 将是前缀或整个路径,直到将是前缀(即) 因为每个前缀都有读写限制。

  • 面临使用JAXB解组的问题。我需要使用多个名称空间。Java类是为第三方提供的XSD生成的。因此,我不想在Java类中的XMLRootElement指定名称空间,也不想手动更改多个类。 编组逻辑如下: xmlelement类TokenRequest.java BasicInRequestType.java 我在package-info.java中指定了前缀 TokenRequest元素实际上引用了

  • 要在你的应用中使用主题,你必须实现一个org.springframework.ui.context.ThemeSource接口。WebApplicationContext接口继承了ThemeSource接口,但主要的工作它还是委托给接口具体的实现来完成。默认的实现是org.springframework.ui.context.support.ResourceBundleThemeSource,它会

  • 问题内容: 我的Web安全配置中包含以下代码: 因此,我在数据库中添加了一个具有“ ADMIN”角色的用户,并且尝试与此用户登录时总是收到403错误,然后为spring启用了日志功能,并发现以下行: 为什么Spring Security在寻找“ ROLE_ADMIN”而不是“ ADMIN”? 问题答案: 在Spring 4中,有两个方法,并且在类中定义。这两种方法仅检查你的自定义角色名称,而不添加