我正在尝试从XMLSpringAMQP配置迁移到基于java注释的配置,因为它“更简单”。不确定我做错了什么XML配置工作正常,但java@Configurable抛出了一个“由:java.net.SocketException:Connection reset引起的”异常。
XML配置(完美工作):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- define which properties files will be used -->
<context:property-placeholder location="classpath:*.properties" />
<rabbit:connection-factory id="connectionFactory"
addresses='${rabbitmq.hostname}'
username='${rabbitmq.username}'
password='${rabbitmq.password}'
virtual-host='${rabbitmq.virtual_host}'
cache-mode='${rabbitmq.cache_mode}'
channel-cache-size='${rabbitmq.channel_cache_size}'/>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="3"/>
<property name="maxPoolSize" value="5"/>
<property name="queueCapacity" value="15"/>
</bean>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="${rabbitmq.queue_name}" />
<rabbit:topic-exchange name="${rabbitmq.topic_exchange_name}">
<rabbit:bindings>
<rabbit:binding queue="${rabbitmq.queue_name}" pattern="${rabbitmq.topic_exchange_pattern}"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<bean id="listener" class="com.my.package.path.worker.DefaultMessageListener"/>
<rabbit:listener-container id="listenerContainer" connection-factory="connectionFactory" task-executor="taskExecutor">
<rabbit:listener ref="listener" queues="notification.main" />
</rabbit:listener-container>
</beans>
Java配置:
@Configurable
@PropertySource("classpath:rabbitmq.properties")
public class RabbitMQConfig {
@Value("${rabbitmq.hostname}")
private String hostname;
@Value("${rabbitmq.port}")
private String port;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
@Value("${rabbitmq.virtual_host}")
private String virtualHost;
//@Value("${rabbitmq.cache_mode}")
//private String cacheMode;
@Value("${rabbitmq.channel_cache_size}")
private String channelCacheSize;
@Value("${rabbitmq.topic_exchange_name}")
private String topicExchangeName;
@Value("${rabbitmq.topic_exchange_pattern}")
private String topicExchangePattern;
@Value("${rabbitmq.queue_name}")
private String queueName;
@Autowired
private ConnectionFactory cachingConnectionFactory;
@Bean(name="cachingConnectionFactory")
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port));
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
//connectionFactory.setCacheMode(CacheMode.valueOf(cacheMode));
connectionFactory.setChannelCacheSize(Integer.valueOf( channelCacheSize ));
return connectionFactory;
}
@Bean(name="taskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor();
tpte.setCorePoolSize(3);
tpte.setMaxPoolSize(5);
tpte.setQueueCapacity(15);
return tpte;
}
@Bean
public AmqpTemplate AmqpTemplate() {
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
return template;
}
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin amqpAdmin = new RabbitAdmin(cachingConnectionFactory);
return amqpAdmin;
}
@Bean
public Queue queue() {
return new Queue(queueName);
}
@Bean
public TopicExchange topicExchange() {
TopicExchange topicExchange = new TopicExchange(topicExchangeName);
return topicExchange;
}
@Bean
public Binding dataBinding(TopicExchange topicExchange, Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with(topicExchangePattern);
}
@Bean
public DefaultMessageListener defaultMessageListener() {
return new DefaultMessageListener();
}
@Bean
public SimpleMessageListenerContainer container(DefaultMessageListener defaultMessageListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(cachingConnectionFactory);
container.setQueueNames(queueName);
container.setAutoStartup(true);
container.setMessageListener(defaultMessageListener);
//container.setTaskExecutor(taskExecutor);
return container;
}
@Bean
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
java配置错误:
INFO : org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - No global properties bean
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Starting Rabbit listener container.
ERROR: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Failed to check/redeclare auto-delete queue(s).
org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:217)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:444)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1004)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:254)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:963)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1081)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:376)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:603)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:637)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:208)
... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 16 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:209)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534)
... 1 more
我进入了Spring amqp代码,罪魁祸首是RabbitAdmin#getQueueProperties方法。在XML配置中,它执行良好。。。但是,一旦使用java配置执行,它就会抛出上面的异常?我所做的是不同的?两个配置在我看来都一样。
package org.springframework.amqp.rabbit.core;
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean {
//...
@Override
public Properties getQueueProperties(final String queueName) {
Assert.hasText(queueName, "'queueName' cannot be null or empty");
return this.rabbitTemplate.execute(new ChannelCallback<Properties>() {
@Override
public Properties doInRabbit(Channel channel) throws Exception {
try {
DeclareOk declareOk = channel.queueDeclarePassive(queueName);
Properties props = new Properties();
props.put(QUEUE_NAME, declareOk.getQueue());
props.put(QUEUE_MESSAGE_COUNT, declareOk.getMessageCount());
props.put(QUEUE_CONSUMER_COUNT, declareOk.getConsumerCount());
return props;
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Queue '" + queueName + "' does not exist");
}
return null;
}
}
});
}
}
两个配置使用完全相同的rabbitmq。类路径上的属性文件。我甚至在运行时检查了两个配置的RabbitAdmin和RabbitTemplate类的属性,它们看起来完全相同。。。
我没有使用“/”根虚拟主机。我有自己的虚拟主机自定义值。尽管我确实通过spel将这个属性注入到java配置中,但我没有在connectionFactory上显式地设置它。
connectionFactory.setVirtualHost(virtualHost);
感谢@Gary Russell帮我解决问题。
@Bean(name="cachingConnectionFactory")
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port));
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setChannelCacheSize(Integer.valueOf( channelCacheSize ));
return connectionFactory;
}
您应该使用@Configuration
,而不是@Configurable
。
编辑:
看起来Rabbitmq服务器正在关闭连接:
Caused by: java.net.SocketException: Connection reset
查看服务器日志;如果没有帮助,请在某个地方发布一个完整的org.springframework
的DEBUG日志(可能对这里来说太大了)。
编辑2:
您有身份验证问题。。。
{handshake_error,opening,0,
{amqp_error,access_refused,
"access to vhost '/' refused for user 'gggdw'",
'connection.open'}}
...检查您的用户名和密码(以及vhost)。
我正在从GlassFish4迁移到Wildfly8.1。 顺便说一句,Mojarra版本是2.2.6,所以这里不应该有这样的问题
问题内容: 我正在尝试将JUnit和Spring结合在一个Web项目中。由于这个问题,我已成功将spring config文件加载到TestClass中: 但是当测试运行时,出现以下错误: 由以下原因引起:org.springframework.beans.factory.parsing.BeanDefinitionParsingException:配置问题:无法找到XML模式命名空间的Sprin
问题内容: 我有以下问题,我需要删除“ cfdi”的“ cfdi_”: 我使用了命名空间来解决这个问题,但是每个节点都重复了这些命名空间,因此无法消除它们,如果您能帮助我,我将不胜感激 结果1: 第二次尝试: 结果: 我的代码更大,并且重复了太多次,我不知道如何消除它们 问题答案: 从XML来看,第二个结果是自给自足的XML元素,而第一个结果只能是定义前缀名称空间的其他元素的一部分。因此,我想所有
本文向大家介绍xpath 示例XML(无名称空间),包括了xpath 示例XML(无名称空间)的使用技巧和注意事项,需要的朋友参考一下 示例 这是一些示例XML,可以针对这些XML编写示例XPath:
我一直使用hibernate。cfg。配置Hibernate的xml。由于各种原因,我需要将所有配置转移到Java,并且我在映射实体方面存在问题。 以前在XML中,我的映射是这样的: 一切都是注释驱动的,所以我不需要指定其他任何东西。 现在,我使用Hibernate的配置来配置一切。类,如何添加映射?我尝试了配置。addClass(Test.class)但这会引发一个错误,即file 但是在第一次
问题内容: 我的设置非常简单:我有一个Web前端,后端是弹簧接线的。 我正在使用AOP在rpc服务上增加一层安全性。 一切都很好,除了Web应用程序在启动时中止的事实: 这是我的xml配置文件中的代码段: 我在互联网上读到,这可能是我上课负担问题的核心。令人怀疑的是,因为这是我的WEB-INF / lib目录: 问题答案: 您是否尝试过将所有罐子直接放在目录中而不是其子目录中? 不,只是 其余的罐