当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ Artemis是否支持更新最后一个值队列中的计划消息?

鞠晋
2023-03-14

在我对Artemis LastValueQueue代码的测试和回顾中,消息的调度延迟似乎优先于对“last value key”的评估。换言之,如果您安排了一条消息,那么它只会在准备传递时替换队列中的最后一个值。

我的问题是我是否正确理解了代码,如果是,是否有一个解决方案或ActiveMQ/Artemis的功能可以帮助满足我们的要求。

我们的要求如下:

  1. 生成一条消息,并将该消息的处理延迟到将来的某个时间点(通常为30秒)
  2. 如果由于新的外部事件生成了消息的更新版本,则用消息的新版本替换任何现有的计划消息-除了消息有效负载外,还应更新计划的传递时间

其他一些注意事项:

  • 我目前的原型使用的是Artemis嵌入服务器
  • Spring-jms JmsTemboard正在被用来产生消息
  • Spring-jms JmsListenerContainerFactory正在被用于消耗消息
  • 我们目前没有使用SpringBoot,所以您将在下面看到一些bean设置。

ArtemisConfig.java:

@Configuration
@EnableJms
public class ArtemisConfig {

    @Bean
    public org.apache.activemq.artemis.core.config.Configuration configuration() throws Exception {
        org.apache.activemq.artemis.core.config.Configuration config = new ConfigurationImpl();
        config.addAcceptorConfiguration("in-vm", "vm://0");
        config.setPersistenceEnabled(true);
        config.setSecurityEnabled(false);
        config.setJournalType(JournalType.ASYNCIO);
        config.setCreateJournalDir(true);
        config.setJournalDirectory("/var/mq/journal");
        config.setBindingsDirectory("/var/mq/bindings");
        config.setLargeMessagesDirectory("/var/mq/large-messages");
        config.setJMXManagementEnabled(true);

        QueueConfiguration queueConfiguration = new QueueConfiguration("MYLASTVALUEQUEUE");
        queueConfiguration.setAddress("MYLASTVALUEQUEUE");
        queueConfiguration.setLastValueKey("uniqueJobId");
        queueConfiguration.setDurable(true);
        queueConfiguration.setEnabled(true);
        queueConfiguration.setRoutingType(RoutingType.ANYCAST);

        CoreAddressConfiguration coreAddressConfiguration = new CoreAddressConfiguration();
        coreAddressConfiguration.addQueueConfiguration(queueConfiguration);

        config.addAddressConfiguration(coreAddressConfiguration);

        return config;
    }

    @Bean
    public EmbeddedActiveMQ artemisServer() throws Exception {
        EmbeddedActiveMQ server = new EmbeddedActiveMQ();
        server.setConfiguration(configuration());
        server.start();

        return server;
    }

    @PreDestroy
    public void preDestroy() throws Exception {
        artemisServer().stop();
    }

    @Bean
    public ConnectionFactory activeMqConnectionFactory() throws Exception {
        return ActiveMQJMSClient.createConnectionFactory("vm://0", "artemis-client");
    }


    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory() throws Exception {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMqConnectionFactory());
        factory.setSessionTransacted(true);
        factory.setConcurrency("8");
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @Bean
    public JmsTemplate jmsTemplate() throws Exception {
        JmsTemplate jmsTemplate = new JmsTemplate(activeMqConnectionFactory());
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
        jmsTemplate.setDeliveryPersistent(true);

        return jmsTemplate;
    }

    @Bean
    QueueMessageService queueMessageService() {
        return new QueueMessageService();
    }
}

QueueMessageService。JAVA

public class QueueMessageService {
    @Resource
    private JmsTemplate jmsTemplate;

    public void queueJobRequest(
            final String queue,
            final int priority,
            final long deliveryDelayInSeconds,
            final MyMessage message) {

        jmsTemplate.convertAndSend(queue, jobRequest, message -> {
            message.setJMSPriority(priority);
            if (deliveryDelayInSeconds > 0 && deliveryDelayInSeconds <= 86400) {
                message.setLongProperty(
                        Message.HDR_SCHEDULED_DELIVERY_TIME.toString(),
                        Instant.now().plus(deliveryDelayInSeconds, ChronoUnit.SECONDS).toEpochMilli()
                );
            }
            message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "uniqueJobId");
            message.setStringProperty("uniqueJobId", jobRequest.getUniqueJobId().toString());
            return message;
        });
    }
}

共有1个答案

束志业
2023-03-14

您对带有最后一个值队列的计划消息的语义的理解是正确的。当消息被调度时,从技术上讲,它还不在队列中。在计划时间到达强制执行最后一个值队列语义的点之前,不会将其放入队列。

除了实现一个新功能之外,我看不出你如何以任何自动的方式实现你想要的行为。在这一点上,我的建议是使用管理API(即QueueControl)在发送新计划消息之前手动删除旧计划消息。您可以使用RemveMessage方法中的一个,因为它们可以处理计划的消息和非计划的消息。

 类似资料:
  • 问题内容: 由于Android当前不支持java7,我发现自己想知道他们是否在致力于支持Java7? 问题答案: 令我印象深刻的是,Android并未使用oracle java,而是使用了部分和谐的java:http : //en.wikipedia.org/wiki/Apache_Harmony#Use_in_Android_SDK http://arstechnica.com/open-sou

  • 分支 开始时间 积极支持截止时间 安全维护截止时间 v4.3.x 2019-2-7 2019-9-30 2019-12-31 v4.4.x 2019-4-15 2020-4-30 2020-7-31 v4.5.x 2019-12-20 2020-12-31 2021-3-31 积极支持 受到官方开发组的积极支持,已报告的错误和安全问题将会立即被修复,并按照常规流程发布正式的版本。 安全维护 仅支持

  • 我正在为Azure服务总线使用最新的Java绑定(V3.1.3):https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/servicebus 当我创建一个新的队列客户端,计划一条消息,然后取消它... ...代码似乎按预期工作:活动消息计数变为0。但是一旦被调度的消息到达它应该被调度的时间(我测试了10秒和100秒以后),消息有时会

  • 问题内容: 我在寻找android文档中的postDelayed 发布延迟文档 基本上,这就是文档对该方法的说法-“导致将Runnable添加到消息队列中,并在经过指定的时间后运行。该Runnable将在用户界面线程上运行。” 我知道每个线程都有一个与之关联的消息队列,循环程序和处理程序。- 什么是Android的尺蠖,处理程序和的MessageQueue之间的关系?。就“在指定的时间段后运行”而

  • 我翻阅了rabbitmq文档,似乎rabbitmq不处理消息重新传递计数。如果我要手动确认/NACK消息,我需要将重试计数保存在内存中(例如,使用correlationId作为映射中的唯一键),或者在消息中设置我自己的头并重新发送它(从而将其放在队列的末尾) 然而,这是一个Spring处理的情况。具体来说,我指的是RetryInterceptorBuilder.stateful()。maxAtte

  • Cap'n Proto是否支持分隔消息? 我的目标是将多条消息写入文件管道,并在写入时实时读取。 所以... > 解析器必须能够检测不完整的消息并等待。