当前位置: 首页 > 知识库问答 >
问题:

未设置活动MQ重新交付策略

郦昆
2023-03-14

我正在使用:

  • SpringBoot 2.0.4
  • ActiveMQ 5.15.5
  • Apache Camel 2.22.0
  • Java1.8
  • 太棒了
  • 马文

基本上,我有一个带有Apache Camel路由的SpringBoot应用程序,它使用来自ActiveMQ的消息和事务。我需要在ActiveMQ上设置一个Re的策略,所以当处理中发生错误时,消息会被重试多次。

我已经用bean为ActiveMQ创建了一个配置类,事务按预期工作,但ReddeliveryPolicy不工作。有人能帮我弄明白这是怎么回事吗?

以下是产生错误的消息的日志输出:

2018-10-23 10:35:2005年8月28日调试10524---[mer〔entryQueue〕]o.a.c.s.spi。TransactionErrorHandler:事务开始(0x35d60381)已为(MessageId:ID:EPIC-LAP-25-50304-1540306817804:3:1:1:2 on ExchangeId:ID-EPIC-LAP-25-1540312510586-0-1))2018-10-23 10:35:28.020 DEBUG 10524---[mer[entryQueue]]o.apache.camel.processor重新传递(false)。发送处理器:

这是我的 ActiveMQ 配置类:

import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.RedeliveryPolicy
import org.apache.activemq.camel.component.ActiveMQComponent
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.connection.JmsTransactionManager

import javax.jms.DeliveryMode

@Configuration
class ActiveMQConfiguration {

    @Bean
    ActiveMQConnectionFactory activeMQConnectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory()
        activeMQConnectionFactory.brokerURL = 'tcp://localhost:61616'
        activeMQConnectionFactory.userName = 'admin'
        activeMQConnectionFactory.password = 'admin'

        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy()
        redeliveryPolicy.maximumRedeliveries = 3
        redeliveryPolicy.redeliveryDelay = 150L
        redeliveryPolicy.useExponentialBackOff = true
        redeliveryPolicy.backOffMultiplier = 1.5

        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy)

        activeMQConnectionFactory
    }

    @Bean
    ActiveMQComponent activeMQComponent(@Qualifier('activeMQConnectionFactory')ActiveMQConnectionFactory activeMQConnectionFactory) {
        ActiveMQComponent activeMQComponent = new ActiveMQComponent()
        activeMQComponent.connectionFactory = activeMQConnectionFactory
        activeMQComponent.transacted = true
        activeMQComponent.transactionManager = txManager()
        activeMQComponent.cacheLevelName = 'CACHE_CONSUMER'
        activeMQComponent.lazyCreateTransactionManager = false
        activeMQComponent.deliveryMode = DeliveryMode.PERSISTENT

        activeMQComponent
    }

    @Bean
    JmsTransactionManager txManager(@Qualifier('activeMQConnectionFactory') ActiveMQConnectionFactory activeMQConnectionFactory) {
        JmsTransactionManager txManager = new JmsTransactionManager()
        txManager.connectionFactory = activeMQConnectionFactory
        txManager.rollbackOnCommitFailure = true

        txManager
    }

}

共有2个答案

陈浩
2023-03-14

不久前,我在dlq队列方面遇到了问题——不是代码中设置的所有参数都有效。我必须向acitvemq配置添加设置。是的,划分配置不是一个好的决定,但是我没有找到其他的。下面是我的jms配置类和一个通过activemq.xml的队列配置示例:

@Configuration
@EnableJms
public class JmsConfig {

    private Environment env;

    @Autowired
    public void setEnv(Environment env) {
        this.env = env;
    }

    @Bean(name = "activemq")
    public ActiveMQComponent activemq(@Qualifier("activemqTransactionManager") JmsTransactionManager jmsTransactionManager,
                                      @Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
        ActiveMQComponent activeMQComponent = new ActiveMQComponent();
        activeMQComponent.setTransactionManager(jmsTransactionManager);
        activeMQComponent.setConnectionFactory(connectionFactory);
        return activeMQComponent;
    }

    @Bean(name = "activemqJmsTemplate")
    public JmsTemplate jmsTemplate(@Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory);
        return template;
    }

    @Bean(name = "activemqTransactionPolicy")
    public SpringTransactionPolicy activemqTransactionPolicy(
            @Qualifier("activemqTransactionManager") JmsTransactionManager jmsTransactionManager) {
        SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy(jmsTransactionManager);
        springTransactionPolicy.setPropagationBehaviorName("PROPAGATION_REQUIRED");
        return springTransactionPolicy;
    }

    @Bean(name = "activemqTransactionManager")
    public JmsTransactionManager activemqTransactionManager(
            @Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }

    @Bean(name = "activemqConnectionFactory")
    public ConnectionFactory connectionFactory(@Qualifier("activemqRedeliveryPolicy") RedeliveryPolicy redeliveryPolicy) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://" + env.getProperty("queue.url"));
        connectionFactory.setTrustAllPackages(true);

        RedeliveryPolicyMap map = connectionFactory.getRedeliveryPolicyMap();
        map.put(new ActiveMQQueue("queueName.DLQ"), redeliveryPolicy);
        return connectionFactory;
    }

    @Bean(name = "activemqRedeliveryPolicy")
    public RedeliveryPolicy redeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(0);
        return redeliveryPolicy;
    }
}

activevq.xml 中的更改:

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <!--set dead letter queue for our queue. It name will be "myQueueName.DLQ"-->
            <policyEntry queue="myQueueName">
                <deadLetterStrategy>
                    <individualDeadLetterStrategy queuePrefix="" queueSuffix=".DLQ"/>
                </deadLetterStrategy>
            </policyEntry>
            <policyEntry topic=">">
                <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                </pendingMessageLimitStrategy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

<plugins>
<redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
    <redeliveryPolicyMap>
        <redeliveryPolicyMap>
            <redeliveryPolicyEntries>
                <!--Set the redelivery delay to one hour-->
                <redeliveryPolicy queue="myQueueName.DLQ" maximumRedeliveries="-1" redeliveryDelay="3600000"/>
            </redeliveryPolicyEntries>
        </redeliveryPolicyMap>
    </redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
叶淇
2023-03-14

这里有两个问题

1. 您有两个事务管理器

由于CamelActiveMQ组件的配置中有以下两行,因此需要配置两个事务管理器。这是问题的根源。

activeMQComponent.transacted = true // activates local JMS transactions
activeMQComponent.transactionManager = txManager() // additional tx manager

如果你只想从ActiveMQ使用事务,你不需要配置Spring事务管理器。

您的这两行配置足以获得ActiveMQ代理的本地事务。

activeMQComponent.transacted = true
activeMQComponent.lazyCreateTransactionManager = false

因此,您应该删除这一行以及整个txManagerbean

activeMQComponent.transactionManager = txManager()

如果您当前在骆驼路线中设置了事务处理标志,则也必须删除该标志。正如我所写的,即使删除了所有这些,从ActiveMQ消费的路由仍然是事务性的。

2.重新交付不起作用

您还没有发布您的Camel路由,但根据错误输出,我假设代理不重新交付,因为错误由Camel处理。

它是Camel错误处理程序<code>o.a.Camel.processor。DefaultErrorHandler</code>,当错误发生时,它将启动,因为它处理错误,所以消息将针对代理提交,因此不会发生重新传递。

尝试禁用 Camel 错误处理,以查看代理是否在出现错误时重新传递消息。

errorHandler(noErrorHandler());
 类似资料:
  • 问题 在我看来不一致的是,如果您连续发布消息,Artemis似乎保留了顺序,而如果消息之间有轻微的延迟,那么队列不会阻塞,只有失败的消息被延迟调度(根据文档)。 我试图找到一个解决方案,这样如果一个消息失败,并需要在10分钟内重新传递,它不会阻止后续的消息。

  • 谁能解释一下ActiveMQ重新交付策略实际上是如何工作的?它是在客户端还是在服务器端工作? 假设我有一个重新传递策略,每次尝试之间间隔30分钟,重新传递消息长达10分钟,那么失败的消息到底在哪里? 假设消息现在失败了,30分钟后重新发送,那么消息在哪里? http://activemq.apache.org/redelivery-policy.html http://activemq.apach

  • 我正在使用ActiveMQ Artemis 2.17和Spring Boot 2.5.7。我正在发布关于主题和队列的消息并使用它。所有这些都是通过JMS完成的。所有队列(选播或多播)都是耐用的。我的主题(多播地址)有两个持久队列,以便有两个独立的使用者。在我的主题中,这两个消费者使用持久和共享订阅(JMS 2.0)。所有处理都是事务性的,通过Atomikos事务管理器进行管理(我需要它来提交数据库

  • 设置: 我们有一个Spring Boot应用程序,它正在从ActiveMQ Artemis JMS队列读取消息 这些消息正在JPA事务中处理 当在JPA中有一个异常触发回滚时,它也会在Artemis中触发一个JMS回滚,而Artemis设置了重新交付延迟 我们的应用程序在多个实例中并行运行,这在处理共享公共数据的多条消息时会导致乐观锁定问题 问题:当X消息被并行处理并且存在乐观锁定问题时,只有一个

  • 如何配置我的以丢弃未使用的消息? 我不希望我的客户端在订阅服务器队列时收到服务器发送的所有旧消息。 这是我目前的经纪人: 目前,我的服务器首先启动并发送大约10条消息。之后,我的客户机订阅队列,并接收在没有客户机订阅时发送的所有10条消息。我不想要这种行为。 解决 然后我发出这样一条信息: 解决方案2 我创建了一个主题而不是队列。 http://Java sample approach . com