如何使用Spring框架转换以下代码?
ConnectionFactory factory = new ConnectionFactory();
factory.setExceptionHandler(new BrokerExceptionHandler(logger, instance));
public final class BrokerExceptionHandler extends StrictExceptionHandler {
@Override
public void handleReturnListenerException(Channel channel, Throwable exception) {
logger.log(Level.SEVERE, "ReturnListenerException detected: ReturnListener.handleReturn", exception);
this.publishAlert(exception, "ReturnListener.handleReturn");
logger.log(Level.SEVERE, "Close application", exception);
System.exit(-1);
}
....
}
基本上,如果出现rabbitMQ异常,我需要指定一个自定义异常处理程序,然后停止应用程序
如何在每次出现异常时发布rabbitMq消息?
编辑
我以这种方式修改了配置类:
@Bean
SimpleMessageListenerContainer containerPredict(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerPredictAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDefaultRequeueRejected(false);
container.setErrorHandler(new BrokerExceptionHandler());
container.setQueueNames(getQueueName());
container.setMessageListener(listenerAdapter);
return container;
}
这是我的BrokerExceptionHandler类
public class BrokerExceptionHandler implements ErrorHandler {
private final Logger logger = Logger.getLogger(getClass().getSimpleName());
@Autowired
private Helper helper;
@Override
public void handleError(Throwable t) {
logger.log(Level.SEVERE, "Exception Detected. Publishing error alert");
String message = "Exception detected. Message: " + t.getMessage());
// Notify the error to the System sending a new RabbitMq message
System.out.println("---> Before convertAndSend");
rabbitTemplate.convertAndSend(exchange, routing, message);
System.out.println("---> After convertAndSend");
}
}
我可以看到日志异常检测。发布错误警报
和---
这是日志:
2018-10-17 09:32:02.849ERROR 1506---[tainer****-1]BrokerExceptionHandler:检测到异常。发布错误警报
---
2018-10-17 09:32:02.853信息1506-[tainer****-1]o.s.a.r.l.SimpleMessageListenerContainer:重新启动Consumer@4f5b08d:tags=[{amq.ctag-yUcUmg5BCo20ucG1wJZoWA=myechange}],channel=Cached Rabbit通道:AMQChannel(amqp://admin@XXX。XXX。XXX。三十: 5672/testbed\u模拟器,1),连接:Proxy@3964d79共享兔子连接:SimpleConnection@61f39bb[代表=amqp://admin@XXX。XXX。XXX。三十: 5672/testbed_模拟器,本地端口=51528],确认模式=自动本地队列大小=0
2018-10-17 09:32:02.905信息1506---[tainer****-2]o.s.amqp。兔子果心RabbitAdmin:自动声明非持久、自动删除或独占队列(myexchange)持久:false,自动删除:true,独占:true。如果代理在连接工厂处于活动状态时停止并重新启动,则会重新声明,但所有消息都将丢失。
2018-10-17 09:32:02.905信息1506---[tainer****-2]o.s.amqp。兔子果心RabbitAdmin:自动声明非持久、自动删除或独占队列(myexchange)持久:false,自动删除:true,独占:true。如果代理在连接工厂处于活动状态时停止并重新启动,则会重新声明,但所有消息都将丢失。
编辑
调试我看到,在发送新消息之前,调用了以下代码:
文件:SimpleMessageListenerContainer.class行1212
if (!isActive(this.consumer) || aborted) {
.....
}
else {
---> logger.info("Restarting " + this.consumer);
restart(this.consumer);
}
编辑2
示例代码:http://github.com/fabry00/spring-boot-rabbitmq
这取决于您如何进行配置;如果您使用的是Spring Boot的自动配置连接工厂。。。
@Bean
public InitializingBean connectionFactoryConfigurer(CachingConnectionFactory ccf) {
return () -> ccf.getRabbitConnectionFactory().setExceptionHandler(...);
}
如果您正在连接自己的bean(例如通过RabbitConnectionFactoryBean),那么直接设置它。
编辑
您正在错误处理程序中抛出NullPointerException
...
2018-10-17 11:51:58.733 DEBUG 38975 --- [containerKpis-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it
java.lang.NullPointerException: null
at com.test.BrokerExceptionHandler.handleError(BrokerExceptionHandler.java:27) ~[main/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1243) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1488) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1318) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
2018-10-17 11:51:58.734 INFO 38975 --- [containerKpis-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@1aabf50d: tags=[{amq.ctag-VxxHKiMsWI_w8DIooAsySA=myapp.mydomain.KPIS}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@b88a7d6 Shared Rabbit Connection: SimpleConnection@25dc64a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55662], acknowledgeMode=AUTO local queue size=0
要打开DEBUG日志记录,请添加
logging.level.org.springframework.amqp=debug
到您的application.properties
。
。helper
为null,因为错误处理程序不是Spring Bean-@Autowired
仅在Spring管理对象时工作;您正在使用新的BrokerExceptionHandler()。
编辑2
我加了这2颗豆子
@Bean
public BrokerExceptionHandler errorHandler() {
return new BrokerExceptionHandler();
}
@Bean
public MessageConverter json() { // Boot auto-configures in template
return new Jackson2JsonMessageConverter();
}
现在。。。
---> Before publishing Alert event
--- ALERT
2018-10-17 12:14:45.304 INFO 43359 --- [containerKpis-1] Helper : publishAlert
2018-10-17 12:14:45.321 DEBUG 43359 --- [containerKpis-1] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,3)
2018-10-17 12:14:45.321 DEBUG 43359 --- [containerKpis-1] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitTemplate$$Lambda$638/975724213 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@77f3f419 Shared Rabbit Connection: SimpleConnection@10c86af1 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56220]
2018-10-17 12:14:45.321 DEBUG 43359 --- [containerKpis-1] o.s.amqp.rabbit.core.RabbitTemplate : Publishing message (Body:'{"timestamp":1539792885303,"code":"ERROR","severity":"ERROR","message":"Exception detected. Message: Listener method 'kpisEvent' threw exception"}' MessageProperties [headers={sender=myapp, protocolVersion=1.0.0, senderType=MY_COMPONENT_1, __TypeId__=com.test.domain.Alert, timestamp=1539792885304}, contentType=application/json, contentEncoding=UTF-8, contentLength=146, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [myevent.ALERT], routingKey = [/]
--- ALERT 2
---> After publishing Alert event
2018-10-17 12:14:45.323 DEBUG 43359 --- [pool-1-thread-6] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for consumerTag: 'amq.ctag-eYbzZ09pCw3cjdtSprlZMQ' with deliveryTag: '1' in Consumer@4b790d86: tags=[{amq.ctag-eYbzZ09pCw3cjdtSprlZMQ=myapp.myevent.ALERT}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@77f3f419 Shared Rabbit Connection: SimpleConnection@10c86af1 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56220], acknowledgeMode=AUTO local queue size=0
2018-10-17 12:14:45.324 DEBUG 43359 --- [ontainerReset-1] o.s.a.r.listener.BlockingQueueConsumer : Received message: (Body:'{"timestamp":1539792885303,"code":"ERROR","severity":"ERROR","message":"Exception detected. Message: Listener method 'kpisEvent' threw exception"}' MessageProperties [headers={sender=myapp, protocolVersion=1.0.0, senderType=MY_COMPONENT_1, __TypeId__=com.test.domain.Alert, timestamp=1539792885304}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myevent.ALERT, receivedRoutingKey=/, deliveryTag=1, consumerTag=amq.ctag-eYbzZ09pCw3cjdtSprlZMQ, consumerQueue=myapp.myevent.ALERT])
2018-10-17 12:14:45.324 INFO 43359 --- [ontainerReset-1] Application : ---> kpisAlert RECEIVED
2018-10-17 12:14:45.325 ERROR 43359 --- [ontainerReset-1] Application : ---> Message: Exception detected. Message: Listener method 'kpisEvent' threw exception
2018-10-17 12:14:45.326 DEBUG 43359 --- [containerKpis-1] o.s.a.r.listener.BlockingQueueConsumer : Rejecting messages (requeue=false)
编辑
或者,如果你更喜欢格森...
@Bean
public MessageConverter json() {
Gson gson = new GsonBuilder().create();
return new MessageConverter() {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(gson.toJson(object).getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
throw new UnsupportedOperationException();
}
};
}
编辑4
我将你的应用程序的当前版本更改如下:
@Bean
public MessageConverter jsonConverter() {
Gson gson = new GsonBuilder().create();
EventKpisCollected collected = new EventKpisCollected();
return new MessageConverter() {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
System.out.println("toMessage");
return new Message(gson.toJson(object).getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.out.println("fromMessage");
return collected.decode(new String(message.getBody()));
}
};
}
...
@Bean
SimpleMessageListenerContainer containerKpis(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerKpisAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDefaultRequeueRejected(false);
container.setErrorHandler(errorHandler());
container.setQueueNames(getQueueKpis());
container.setMessageListener(listenerKpisAdapter);
return container;
}
@Bean
SimpleMessageListenerContainer containerReset(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAlertAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDefaultRequeueRejected(false);
container.setErrorHandler(errorHandler());
container.setQueueNames(getQueueAlert());
container.setMessageListener(listenerAlertAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerKpisAdapter(Application receiver) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "kpisEvent");
messageListenerAdapter.setMessageConverter(jsonConverter());
return messageListenerAdapter;
}
@Bean
MessageListenerAdapter listenerAlertAdapter(Application receiver) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "alertEvent");
// messageListenerAdapter.setMessageConverter(jsonConverter()); converter only handles events.
return messageListenerAdapter;
}
和
fromMessage
2018-10-19 13:46:53.734 INFO 10725 --- [containerKpis-1] Application : ---> kpisEvent RECEIVED
2018-10-19 13:46:53.734 INFO 10725 --- [containerKpis-1] Application : ---> kpisEvent DECODED, windowId: 1522751098000-1522752198000
通过框架完成事件解码(仅针对当前事件-您将需要第二个转换器来处理alers)。
我试图从我的Spring启动服务向IBMMQ队列发送消息。配置如下: 一个pplication.properties 菜豆 IBM MQ浏览器图片 当我尝试发送消息时,我收到错误: 我错过了什么?我的chammel类型所显示的错误与我的请求不符,但我不知道这意味着什么。 提前谢谢。
我有一个带应用程序的spring boot应用程序。属性文件和spring数据jpa。在应用程序中,我有一个外部依赖项,需要加载外部Spring项目的bean,该项目具有基于xml的配置。外部xml有自己的组件扫描和spring jpa设置,可以与应用程序中提供该DB属性的其他DB进行交互。属性文件,我正在使用@ImportResources将其bean注入父应用程序。但是,当我在做这个sprin
是否可以根据我使用的Spring Boot配置文件加载不同的log4j配置。 假设我在application.properties中 在application-live.properties中,我为所有3个都有不同的值
正在尝试在linux服务器上部署spring启动应用程序 r,“线程名称”:“localhost-startStop-1”,“级别”:“错误”,“级别值”:40000,“堆栈跟踪”:“组织”。springframework。豆。工厂BeanCreationException:创建名为“org”的bean时出错。springframework。验证。Bean验证。OptionalValidatorF
从这份官方文件中,很难确定这两个模块之间有什么区别。 谁能提供一个官方和规范的答案来解释两者的区别吗?
我正在tomcat(Servlet2.5)中运行一个传统的Spring启动战。虽然CharacterEncodingFilter默认配置为spring boot 1.2.0,但请求没有任何编码集。我可以在自动配置和日志中看到它的配置。这可能不是由SpringBootLegacy(1.0.1)配置的?我将过滤器添加到web。xml和我的请求现在采用utf-8编码。但是,这不适用于请求参数。我认为这是