当前位置: 首页 > 知识库问答 >
问题:

_amq_group_id存在于消息中,但@jmslistener中的JMSXGroupID为空

呼延骏俊
2023-03-14

消息组中的消息共享相同的组id,即它们具有相同的组标识符属性(JMS为JMSXGroupID,Apache ActiveMQ Artemis Core API为_AMQ_GROUP_ID)。

当我浏览值为product=paper的代理中的消息时,我可以看到为什么最初通过jmsxgroupid设置的属性变成了_amq_group_id。但是,在我的@jmslistener注释方法中,我可以看到_amq_group_id属性丢失,并且jmsxgroupid在消息的headershashmap中显示为null。

@JmsListener(destination = "${artemis.destination}", subscription = "${artemis.subscriptionName}",
            containerFactory = "containerFactory", concurrency = "15-15")
public void consumeMessage(Message<StatefulSpineEvent<?>> eventMessage)

所以

    null
@Component
public class GroupIdMessageMapper extends SimpleJmsHeaderMapper {

    @Override
    public MessageHeaders toHeaders(Message jmsMessage) {

        MessageHeaders messageHeaders = super.toHeaders(jmsMessage);

        Map<String, Object> messageHeadersMap = new HashMap<>(messageHeaders);

        try {
            messageHeadersMap.put("JMSXGroupID", jmsMessage.getStringProperty("_AMQ_GROUP_ID"));
        } catch (JMSException e) {
            e.printStackTrace();
        }

        // can see while debugging that this returns the correct headers
        return new MessageHeaders(messageHeadersMap);
    }
}
@Component
public class CustomSpringJmsListener {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    @JmsListener(destination = "local-queue", subscription = "groupid-example",
            containerFactory = "myContainerFactory", concurrency = "15-15")
    public void receive(Message message) throws JMSException {
        LOG.info("Received message: " + message);
    }
}
@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {

    private static Logger LOG = LoggerFactory
            .getLogger(GroupidApplication.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired MessageConverter messageConverter;

    public static void main(String[] args) {
        LOG.info("STARTING THE APPLICATION");
        SpringApplication.run(GroupidApplication.class, args);

        LOG.info("APPLICATION FINISHED");
    }

    @Override
    public void run(String... args) {
        LOG.info("EXECUTING : command line runner");

        jmsTemplate.setPubSubDomain(true);

        createAndSendObjectMessage("Message1");
        createAndSendTextMessage("Message2");
        createAndSendTextMessage("Message3");
        createAndSendTextMessage("Message4");
        createAndSendTextMessage("Message5");
        createAndSendTextMessage("Message6");
    }

    private void createAndSendTextMessage(String messageBody) {
        jmsTemplate.send("local-queue", session -> {
            Message message = session.createTextMessage(messageBody);

            message.setStringProperty("JMSXGroupID", "product=paper");

            return message;
        });
    }

    // BEANS

    @Bean
    public JmsListenerContainerFactory<?> myContainerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        // You could still override some of Boot's default if necessary.
        factory.setSubscriptionDurable(true);
        factory.setSubscriptionShared(true);
        factory.setMessageConverter(messagingMessageConverter());

        return factory;
    }

    @Bean
    public MessagingMessageConverter messagingMessageConverter() {
        return new MessagingMessageConverter(messageConverter, new GroupIdMessageMapper());
    }
}

SimpleJmsHeaderMapper调用位置的堆栈跟踪:

ToHeaders:130,SimpleJmsHeaderMapper(org.springframework.jms.support)ToHeaders:57,SimpleJmsHeaderMapper(org.springframework.jms.support)ExtractHeaders:148,MessagingMessageConverter(org.springframework.jms.support.converter)Access$100:466,AbstractAdaptableMessageConverterAdapter(eargumentinternal:68,HeaderMethodArgumentResolver(org.springframework.messaging.handler.annotation.support)resolveargument:100,AbstractNamedValueMethodArgumentResolver(org.springframework.messaging.handler.annotation.support)resolveargument:117,HandlerMethodArgumentResolver(org.springframework.messaging.handler.annotation.support)framework.messaging.handler.invocation)InvokeHandler:114,MessagingMessageListenerAdapter(org.springframework.jms.listener.adapter)onMessage:77,MessagingMessageListenerAdapter(org.springframework.jms.listener.adapter)doInvokeListener:736,AbstractMessageListenerContainer(org.springframework.jms.listener)InvokeListener:696,IstenerContainer(org.springframework.jms.listener)ReceiveandExecute:257,AbstractPollingMessageListenerContainer(org.springframework.jms.listener)InvokeListener:1190,DefaultMessageListenerContainer$AsyncMessageListenerInvoker(org.springframework.jms.listener)ExecuteOnGoingLoop:1180,DefaultMessageListenerContainer.springframework.jms.listener)运行:748,Thread(java.lang)

共有1个答案

淳于哲
2023-03-14

尝试子类化SimpleJMSheaderMapper并重写ToHeaders()。调用super.toHeaders(),根据结果创建一个新的映射<>将()所需的任何其他标头放入映射中,并从映射返回一个新的MessageHeaders

将自定义映射器传递到新的MessagingMessageConverter并将其传递到容器工厂。

如果使用Spring Boot,只需将转换器添加为@bean,Boot将自动将其连接到工厂中。

@SpringBootApplication
public class So58399905Application {

    public static void main(String[] args) {
        SpringApplication.run(So58399905Application.class, args);
    }

    @JmsListener(destination = "foo")
    public void listen(String in, MessageHeaders headers) {
        System.out.println(in + headers);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> template.convertAndSend("foo", "bar", msg -> {
            msg.setStringProperty("JMSXGroupID", "product=x");
            return msg;
        });
    }

}
bar{jms_redelivered=false, JMSXGroupID=product=x, jms_deliveryMode=2, JMSXDeliveryCount=1,  ...
 类似资料:
  • 我的代码中有一个Spring JmsListener。它接收和消费消息2天,但突然在这2天后,它没有收到来自外部Activemq的消息。然而,它的队列中有一些挂起的消息。当我重置Activemq和消费者时,消费者会收到大量消息。当消息挂起时,连接到Activemq的消费者(代表Spring致动器日志)。日志和配置显示Activemq没有将消息推送给消费者。我有另一个像这个消费者一样的服务从其他队列

  • 我有一个应用程序,在本地工作很好,但在客户端,它崩溃,因为一个POST参数是NULL。 此返回为NULL。 因此,我添加了一个过滤器来记录请求的get/post/body。在本地,日志如下所示: [方法:post][请求URI://peps/zkau][请求参数:{dtid=z_0n8,uuid_0=x38pz,data_0={“pagex”:372,“pagey”:103,“which”:1,“

  • 我使用什么数据类型在协议缓冲区消息中存储单个字节?看到https://developers.google.com/protocol-buffers/docs/proto#scalar的列表,似乎*int32类型之一最合适。有没有更有效的方法来存储单个字节?

  • 问题内容: 我正在使用Laravel Mail函数发送电子邮件。以下是我的文件设置。 控制器邮件方法 当我运行代码时,它给我以下错误消息: Swift_TransportException 预期的响应代码为220,但得到的代码为“”,并带有消息“” 我已经在视图中创建了一个包含一些数据的文件。 如何解决此错误消息? 问题答案: 如果您未启用要用于发送的帐户的两步验证(可以在此处完成),则通常会出现

  • 我对ActiveMQ有一个奇怪的问题。我有一个队列,似乎有一个挂起的消息,但当我打开队列时,没有消息。 这里怎么了?真的有消息等待处理吗?我怎样才能把信息带回来,或者至少能看到内容? 编辑:刚刚发现ActiveMQ 5.6.0的这两个错误。这可能是那个问题的根源吗? 不正确的报告挂起QueueSize的持久子后重新连接与未破解 OrderPendingList中的问题可能导致在持久子重新连接后无法

  • 问题内容: 我试图在通过新的Django消息框架显示的消息中显示一些html。具体来说,我是通过ModelAdmin.message_user方法执行此操作的,该方法只是围绕messages()的一个瘦包装: 到目前为止,我尝试过的所有内容似乎都显示了转义的HTML。 不起作用,也不会: admin base.html模板中模板代码的显示非常简单: 因此,我不确定自己在做什么错。 问题答案: 另一