消息组中的消息共享相同的组id,即它们具有相同的组标识符属性(JMS为JMSXGroupID,Apache ActiveMQ Artemis Core API为_AMQ_GROUP_ID)。
当我浏览值为product=paper的代理中的消息时,我可以看到为什么最初通过jmsxgroupid
设置的属性变成了_amq_group_id
。但是,在我的@jmslistener
注释方法中,我可以看到_amq_group_id
属性丢失,并且jmsxgroupid
在消息的headers
hashmap中显示为null。
@JmsListener(destination = "${artemis.destination}", subscription = "${artemis.subscriptionName}",
containerFactory = "containerFactory", concurrency = "15-15")
public void consumeMessage(Message<StatefulSpineEvent<?>> eventMessage)
所以
@Component
public class GroupIdMessageMapper extends SimpleJmsHeaderMapper {
@Override
public MessageHeaders toHeaders(Message jmsMessage) {
MessageHeaders messageHeaders = super.toHeaders(jmsMessage);
Map<String, Object> messageHeadersMap = new HashMap<>(messageHeaders);
try {
messageHeadersMap.put("JMSXGroupID", jmsMessage.getStringProperty("_AMQ_GROUP_ID"));
} catch (JMSException e) {
e.printStackTrace();
}
// can see while debugging that this returns the correct headers
return new MessageHeaders(messageHeadersMap);
}
}
@Component
public class CustomSpringJmsListener {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
@JmsListener(destination = "local-queue", subscription = "groupid-example",
containerFactory = "myContainerFactory", concurrency = "15-15")
public void receive(Message message) throws JMSException {
LOG.info("Received message: " + message);
}
}
@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {
private static Logger LOG = LoggerFactory
.getLogger(GroupidApplication.class);
@Autowired
private JmsTemplate jmsTemplate;
@Autowired MessageConverter messageConverter;
public static void main(String[] args) {
LOG.info("STARTING THE APPLICATION");
SpringApplication.run(GroupidApplication.class, args);
LOG.info("APPLICATION FINISHED");
}
@Override
public void run(String... args) {
LOG.info("EXECUTING : command line runner");
jmsTemplate.setPubSubDomain(true);
createAndSendObjectMessage("Message1");
createAndSendTextMessage("Message2");
createAndSendTextMessage("Message3");
createAndSendTextMessage("Message4");
createAndSendTextMessage("Message5");
createAndSendTextMessage("Message6");
}
private void createAndSendTextMessage(String messageBody) {
jmsTemplate.send("local-queue", session -> {
Message message = session.createTextMessage(messageBody);
message.setStringProperty("JMSXGroupID", "product=paper");
return message;
});
}
// BEANS
@Bean
public JmsListenerContainerFactory<?> myContainerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setSubscriptionDurable(true);
factory.setSubscriptionShared(true);
factory.setMessageConverter(messagingMessageConverter());
return factory;
}
@Bean
public MessagingMessageConverter messagingMessageConverter() {
return new MessagingMessageConverter(messageConverter, new GroupIdMessageMapper());
}
}
SimpleJmsHeaderMapper调用位置的堆栈跟踪:
ToHeaders:130,SimpleJmsHeaderMapper(org.springframework.jms.support)ToHeaders:57,SimpleJmsHeaderMapper(org.springframework.jms.support)ExtractHeaders:148,MessagingMessageConverter(org.springframework.jms.support.converter)Access$100:466,AbstractAdaptableMessageConverterAdapter(eargumentinternal:68,HeaderMethodArgumentResolver(org.springframework.messaging.handler.annotation.support)resolveargument:100,AbstractNamedValueMethodArgumentResolver(org.springframework.messaging.handler.annotation.support)resolveargument:117,HandlerMethodArgumentResolver(org.springframework.messaging.handler.annotation.support)framework.messaging.handler.invocation)InvokeHandler:114,MessagingMessageListenerAdapter(org.springframework.jms.listener.adapter)onMessage:77,MessagingMessageListenerAdapter(org.springframework.jms.listener.adapter)doInvokeListener:736,AbstractMessageListenerContainer(org.springframework.jms.listener)InvokeListener:696,IstenerContainer(org.springframework.jms.listener)ReceiveandExecute:257,AbstractPollingMessageListenerContainer(org.springframework.jms.listener)InvokeListener:1190,DefaultMessageListenerContainer$AsyncMessageListenerInvoker(org.springframework.jms.listener)ExecuteOnGoingLoop:1180,DefaultMessageListenerContainer.springframework.jms.listener)运行:748,Thread(java.lang)
尝试子类化SimpleJMSheaderMapper
并重写ToHeaders()
。调用super.toHeaders()
,根据结果创建一个新的映射<>
;将()
所需的任何其他标头放入映射中,并从映射返回一个新的MessageHeaders
。
将自定义映射器传递到新的MessagingMessageConverter
并将其传递到容器工厂。
如果使用Spring Boot,只需将转换器添加为@bean
,Boot将自动将其连接到工厂中。
@SpringBootApplication
public class So58399905Application {
public static void main(String[] args) {
SpringApplication.run(So58399905Application.class, args);
}
@JmsListener(destination = "foo")
public void listen(String in, MessageHeaders headers) {
System.out.println(in + headers);
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> template.convertAndSend("foo", "bar", msg -> {
msg.setStringProperty("JMSXGroupID", "product=x");
return msg;
});
}
}
bar{jms_redelivered=false, JMSXGroupID=product=x, jms_deliveryMode=2, JMSXDeliveryCount=1, ...
我的代码中有一个Spring JmsListener。它接收和消费消息2天,但突然在这2天后,它没有收到来自外部Activemq的消息。然而,它的队列中有一些挂起的消息。当我重置Activemq和消费者时,消费者会收到大量消息。当消息挂起时,连接到Activemq的消费者(代表Spring致动器日志)。日志和配置显示Activemq没有将消息推送给消费者。我有另一个像这个消费者一样的服务从其他队列
我有一个应用程序,在本地工作很好,但在客户端,它崩溃,因为一个POST参数是NULL。 此返回为NULL。 因此,我添加了一个过滤器来记录请求的get/post/body。在本地,日志如下所示: [方法:post][请求URI://peps/zkau][请求参数:{dtid=z_0n8,uuid_0=x38pz,data_0={“pagex”:372,“pagey”:103,“which”:1,“
我使用什么数据类型在协议缓冲区消息中存储单个字节?看到https://developers.google.com/protocol-buffers/docs/proto#scalar的列表,似乎*int32类型之一最合适。有没有更有效的方法来存储单个字节?
问题内容: 我正在使用Laravel Mail函数发送电子邮件。以下是我的文件设置。 控制器邮件方法 当我运行代码时,它给我以下错误消息: Swift_TransportException 预期的响应代码为220,但得到的代码为“”,并带有消息“” 我已经在视图中创建了一个包含一些数据的文件。 如何解决此错误消息? 问题答案: 如果您未启用要用于发送的帐户的两步验证(可以在此处完成),则通常会出现
我对ActiveMQ有一个奇怪的问题。我有一个队列,似乎有一个挂起的消息,但当我打开队列时,没有消息。 这里怎么了?真的有消息等待处理吗?我怎样才能把信息带回来,或者至少能看到内容? 编辑:刚刚发现ActiveMQ 5.6.0的这两个错误。这可能是那个问题的根源吗? 不正确的报告挂起QueueSize的持久子后重新连接与未破解 OrderPendingList中的问题可能导致在持久子重新连接后无法
问题内容: 我试图在通过新的Django消息框架显示的消息中显示一些html。具体来说,我是通过ModelAdmin.message_user方法执行此操作的,该方法只是围绕messages()的一个瘦包装: 到目前为止,我尝试过的所有内容似乎都显示了转义的HTML。 不起作用,也不会: admin base.html模板中模板代码的显示非常简单: 因此,我不确定自己在做什么错。 问题答案: 另一