当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ-主题的重新交付策略和死信队列

侯向文
2023-03-14

我正在使用ActiveMQ Artemis 2.17和Spring Boot 2.5.7。我正在发布关于主题和队列的消息并使用它。所有这些都是通过JMS完成的。所有队列(选播或多播)都是耐用的。我的主题(多播地址)有两个持久队列,以便有两个独立的使用者。在我的主题中,这两个消费者使用持久和共享订阅(JMS 2.0)。所有处理都是事务性的,通过Atomikos事务管理器进行管理(我需要它来提交数据库的两个阶段)。

我有一个问题与再交付政策和DLQ。当我在处理消息期间抛出异常时,重新传送策略将正确应用于队列(anycast队列),并且将使用消息创建DLQ。但是,对于主题(多播队列),重新传送策略不适用,并且消息不会发送到DLQ中。

这是我的ActiveMQ Artemis代理配置:

<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

    <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="urn:activemq:core ">

        <name>0.0.0.0</name>


        <!-- Codec and key used to encode the passwords -->
        <!-- TODO : set master-password configuration into the Java code -->
        <password-codec>org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;key=UBNTd0dS9w6f8HDIyGW9
        </password-codec>


        <!-- Configure the persistence into a database (postgresql) -->
        <persistence-enabled>true</persistence-enabled>
        <store>
            <database-store>
                <bindings-table-name>BINDINGS_TABLE</bindings-table-name>
                <message-table-name>MESSAGE_TABLE</message-table-name>
                <page-store-table-name>PAGE_TABLE</page-store-table-name>
                <large-message-table-name>LARGE_MESSAGES_TABLE</large-message-table-name>
                <node-manager-store-table-name>NODE_MANAGER_TABLE</node-manager-store-table-name>
                <jdbc-lock-renew-period>2000</jdbc-lock-renew-period>
                <jdbc-lock-expiration>20000</jdbc-lock-expiration>
                <jdbc-journal-sync-period>5</jdbc-journal-sync-period>
                <!-- Configure a connection pool -->
                <data-source-properties>
                    <data-source-property key="driverClassName" value="org.postgresql.Driver"/>
                    <data-source-property key="url" value="jdbc:postgresql://localhost/artemis"/>
                    <data-source-property key="username" value="postgres"/>
                    <data-source-property key="password" value="ENC(-3eddbe9664c85ec7ed63588b000486cb)"/>
                    <data-source-property key="poolPreparedStatements" value="true"/>
                    <data-source-property key="initialSize" value="2"/>
                    <data-source-property key="minIdle" value="1"/>
                </data-source-properties>
            </database-store>
        </store>


        <!-- Configure the addresses, queues and topics default behaviour -->
        <!-- See: https://activemq.apache.org/components/artemis/documentation/latest/address-model.html -->
        <address-settings>
            <address-setting match="#">
                <dead-letter-address>DLA</dead-letter-address>
                <auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
                <dead-letter-queue-prefix/>
                <dead-letter-queue-suffix>.DLQ</dead-letter-queue-suffix>
                <expiry-address>ExpiryQueue</expiry-address>
                <auto-create-expiry-resources>false</auto-create-expiry-resources>
                <expiry-queue-prefix/>
                <expiry-queue-suffix>.EXP</expiry-queue-suffix>
                <expiry-delay>-1</expiry-delay>
                <max-delivery-attempts>5</max-delivery-attempts>
                <redelivery-delay>250</redelivery-delay>
                <redelivery-delay-multiplier>2.0</redelivery-delay-multiplier>
                <redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
                <max-redelivery-delay>10000</max-redelivery-delay>
                <max-size-bytes>100000</max-size-bytes>
                <page-size-bytes>20000</page-size-bytes>
                <page-max-cache-size>5</page-max-cache-size>
                <max-size-bytes-reject-threshold>-1</max-size-bytes-reject-threshold>
                <address-full-policy>PAGE</address-full-policy>
                <message-counter-history-day-limit>0</message-counter-history-day-limit>
                <default-last-value-queue>false</default-last-value-queue>
                <default-non-destructive>false</default-non-destructive>
                <default-exclusive-queue>false</default-exclusive-queue>
                <default-consumers-before-dispatch>0</default-consumers-before-dispatch>
                <default-delay-before-dispatch>-1</default-delay-before-dispatch>
                <redistribution-delay>0</redistribution-delay>
                <send-to-dla-on-no-route>true</send-to-dla-on-no-route>
                <slow-consumer-threshold>-1</slow-consumer-threshold>
                <slow-consumer-policy>NOTIFY</slow-consumer-policy>
                <slow-consumer-check-period>5</slow-consumer-check-period>
                <!-- We disable the automatic creation of queue or topic -->
                <auto-create-queues>false</auto-create-queues>
                <auto-delete-queues>true</auto-delete-queues>
                <auto-delete-created-queues>false</auto-delete-created-queues>
                <auto-delete-queues-delay>30000</auto-delete-queues-delay>
                <auto-delete-queues-message-count>0</auto-delete-queues-message-count>
                <config-delete-queues>OFF</config-delete-queues>
                <!-- We disable the automatic creation of address -->
                <auto-create-addresses>false</auto-create-addresses>
                <auto-delete-addresses>true</auto-delete-addresses>
                <auto-delete-addresses-delay>30000</auto-delete-addresses-delay>
                <config-delete-addresses>OFF</config-delete-addresses>
                <management-browse-page-size>200</management-browse-page-size>
                <default-purge-on-no-consumers>false</default-purge-on-no-consumers>
                <default-max-consumers>-1</default-max-consumers>
                <default-queue-routing-type>ANYCAST</default-queue-routing-type>
                <default-address-routing-type>ANYCAST</default-address-routing-type>
                <default-ring-size>-1</default-ring-size>
                <retroactive-message-count>0</retroactive-message-count>
                <enable-metrics>true</enable-metrics>
                <!-- We automatically force group rebalance and a dispatch pause during group rebalance -->
                <default-group-rebalance>true</default-group-rebalance>
                <default-group-rebalance-pause-dispatch>true</default-group-rebalance-pause-dispatch>
                <default-group-buckets>1024</default-group-buckets>
            </address-setting>
        </address-settings>


        <!-- Define the protocols accepted -->
        <!-- See: https://activemq.apache.org/components/artemis/documentation/latest/protocols-interoperability.html -->
        <acceptors>
            <!-- Acceptor for only CORE protocol -->
            <!-- We enable the cache destination as recommended into the documentation. See: https://activemq.apache.org/components/artemis/documentation/latest/using-jms.html -->
            <acceptor name="artemis">
                tcp://0.0.0.0:61616?protocols=CORE,tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;useEpoll=true,cacheDestinations=true
            </acceptor>
        </acceptors>


        <!-- Define how to connect to another broker -->
        <!-- TODO : est-ce utile ? -->
        <connectors>
            <connector name="netty-connector">tcp://localhost:61616</connector>
            <connector name="broker1-connector">tcp://localhost:61616</connector>
            <connector name="broker2-connector">tcp://localhost:61617</connector>
        </connectors>

        <!-- Configure the High-Availability and broker cluster for the high-availability -->
        <ha-policy>
            <shared-store>
                <master>
                    <failover-on-shutdown>true</failover-on-shutdown>
                </master>
            </shared-store>
        </ha-policy>

        <cluster-connections>
            <cluster-connection name="gerico-cluster">
                <connector-ref>netty-connector</connector-ref>
                <static-connectors>
                    <connector-ref>broker1-connector</connector-ref>
                    <connector-ref>broker2-connector</connector-ref>
                </static-connectors>
            </cluster-connection>
        </cluster-connections>

        <!--       <cluster-user>cluster_user</cluster-user>-->
        <!--       <cluster-password>cluster_user_password</cluster-password>-->

        <!-- should the broker detect dead locks and other issues -->
        <critical-analyzer>true</critical-analyzer>

        <critical-analyzer-timeout>120000</critical-analyzer-timeout>

        <critical-analyzer-check-period>60000</critical-analyzer-check-period>

        <critical-analyzer-policy>HALT</critical-analyzer-policy>


        <page-sync-timeout>72000</page-sync-timeout>


        <!-- the system will enter into page mode once you hit this limit.
       This is an estimate in bytes of how much the messages are using in memory

        The system will use half of the available memory (-Xmx) by default for the global-max-size.
        You may specify a different value here if you need to customize it to your needs.

        <global-max-size>100Mb</global-max-size>

  -->

        <!-- Security configuration -->
        <security-enabled>false</security-enabled>

        <!-- Addresses and queues configuration -->
        <!-- !!! DON'T FORGET TO UPDATE 'slave-broker.xml' FILE !!! -->
        <addresses>
            <address name="topic.test_rde">
                <multicast>
                    <queue name="rde_receiver_1">
                        <durable>true</durable>
                    </queue>
                    <queue name="rde_receiver_2">
                        <durable>true</durable>
                    </queue>
                </multicast>
            </address>
            <address name="queue.test_rde">
                <anycast>
                    <queue name="queue.test_rde">
                        <durable>true</durable>
                    </queue>
                </anycast>
            </address>
        </addresses>
    </core>
</configuration>

Spring Boot中的JMS配置如下:

    @Bean
    public DynamicDestinationResolver destinationResolver() {
        return new DynamicDestinationResolver() {
            @Override
            public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
                if (destinationName.startsWith("topic.")) {
                    pubSubDomain = true;
                }
                else
                    pubSubDomain =false;
                return super.resolveDestinationName(session, destinationName, pubSubDomain);
            }
        };
    }

    @Bean
    public JmsListenerContainerFactory<?> queueConnectionFactory(ConnectionFactory connectionFactory,
                                                                 DefaultJmsListenerContainerFactoryConfigurer configurer,
                                                                 JmsErrorHandler jmsErrorHandler) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(false);

        factory.setSessionTransacted(true);

        factory.setErrorHandler(jmsErrorHandler);

        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> topicConnectionFactory(ConnectionFactory connectionFactory,
                                                                 DefaultJmsListenerContainerFactoryConfigurer configurer,
                                                                 JmsErrorHandler jmsErrorHandler) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(true);

        factory.setSessionTransacted(true);
        factory.setSubscriptionDurable(true);
        factory.setSubscriptionShared(true);

        factory.setErrorHandler(jmsErrorHandler);

        return factory;
    }

消息发布者:

    @GetMapping("api/send")
    public void sendDataToJms() throws InterruptedException {

        OrderRest currentService = this.applicationContext.getBean(OrderRest.class);
        for (Long i = 0L; i < 1L; i++) {
            currentService.sendTopicMessage(i);
            currentService.sendQueueMessage(i);
            Thread.sleep(200L);
        }
    }

    @Transactional
    public void sendTopicMessage(Long id) {
        Order myMessage = new Order("--" + id.toString() + "--", new Date());
        jmsTemplate.convertAndSend("topic.test_rde", myMessage);
    }
    @Transactional
    public void sendQueueMessage(Long id) {
        Order myMessage = new Order("--" + id.toString() + "--", new Date());
        jmsTemplate.convertAndSend("queue.test_rde", myMessage);
    }

听众们:

    @Transactional
    @JmsListener(destination = "topic.test_rde", containerFactory = "topicConnectionFactory", subscription = "rde_receiver_1")
    public void receiveMessage_rde_1(@Payload Order order, @Headers MessageHeaders headers, Message message, Session session) {
        log.info("---> Consumer1 - rde_receiver_1 - " + order.getContent());

            throw new ValidationException("Voluntary exception", "entity", List.of(), List.of());
    }

    @Transactional
    @JmsListener(destination = "queue.test_rde", containerFactory = "queueConnectionFactory")
    public void receiveMessage_rde_queue(@Payload Order order, @Headers MessageHeaders headers, Message message, Session session) {
        log.info("---> Consumer1 - rde_receiver_queue - " + order.getContent());

        throw new ValidationException("Voluntary exception", "entity", List.of(), List.of());
    }

返回策略和DLQ机制只适用于队列(anycat队列),这正常吗?
是否有可能将其应用于主题(多播队列)和共享持久订阅?
如果没有,我怎么能有主题行为,但与再交付和DLQ机制?我应该使用ActiveMQ的转移解决方案吗?

非常感谢你的帮助。当做

共有1个答案

章稳
2023-03-14

我发现了问题。它来自不支持JMS v2的Atomikos。0,但只有JMS 1.1。因此,不可能获得同时支持2-PC和重新交付策略的主题的共享持久订阅。

 类似资料:
  • 我正在设计一种机制来处理消费者没有收到的消息。

  • 我有一个SNS主题&订阅(实际上不止1个)设置来使用SQS DLQ。然而,每一个都告诉我,我有一个策略错误。 我的SNS订阅设置了DLQ: 我的队列存在: 我还尝试在队列上使用一个真正通用的访问策略: 我遵循的是:https://docs.aws.amazon.com/sns/latest/dg/sns-configure-dead-letter-queue.html(步骤5解释了设置策略) 其他

  • 谁能解释一下ActiveMQ重新交付策略实际上是如何工作的?它是在客户端还是在服务器端工作? 假设我有一个重新传递策略,每次尝试之间间隔30分钟,重新传递消息长达10分钟,那么失败的消息到底在哪里? 假设消息现在失败了,30分钟后重新发送,那么消息在哪里? http://activemq.apache.org/redelivery-policy.html http://activemq.apach

  • 问题 在我看来不一致的是,如果您连续发布消息,Artemis似乎保留了顺序,而如果消息之间有轻微的延迟,那么队列不会阻塞,只有失败的消息被延迟调度(根据文档)。 我试图找到一个解决方案,这样如果一个消息失败,并需要在10分钟内重新传递,它不会阻止后续的消息。

  • 设置: 我们有一个Spring Boot应用程序,它正在从ActiveMQ Artemis JMS队列读取消息 这些消息正在JPA事务中处理 当在JPA中有一个异常触发回滚时,它也会在Artemis中触发一个JMS回滚,而Artemis设置了重新交付延迟 我们的应用程序在多个实例中并行运行,这在处理共享公共数据的多条消息时会导致乐观锁定问题 问题:当X消息被并行处理并且存在乐观锁定问题时,只有一个

  • 我正在使用: SpringBoot 2.0.4 ActiveMQ 5.15.5 Apache Camel 2.22.0 Java1.8 太棒了 马文 基本上,我有一个带有Apache Camel路由的SpringBoot应用程序,它使用来自ActiveMQ的消息和事务。我需要在ActiveMQ上设置一个Re的策略,所以当处理中发生错误时,消息会被重试多次。 我已经用bean为ActiveMQ创建了