我对ActiveMQ故障转移传输有问题。我使用Spring(3.0.5)和ActiveMQ(5.2.0)。我想使用ActiveMQ主题来广播一些消息。我的配置如下:
jms上下文。xml
我正在创建简单的ActiveMQConnectionFactory,它使用异步发送并用PooledConnetionFactory包装它,以便以后的JMSTemboard可以重用池连接。
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
">
<!-- JMS -->
<amq:connectionFactory id="jmsTopicAsyncConnectionFactory" brokerURL="${jms.url}">
<property name="useAsyncSend" value="true"/>
</amq:connectionFactory>
<bean id="pooledJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<constructor-arg ref="jmsTopicAsyncConnectionFactory" />
</bean>
</beans>
JMSConfig。java
在java配置中,我使用池连接工厂定义JmsTemplate实例。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
import org.springframework.context.annotation.Scope;
import org.springframework.jms.core.JmsTemplate;
@Configuration
@ImportResource("classpath:jms-context.xml")
public class JmsConfig {
/**
* Used for sending messages to broadcast topics.
*
* @param pooledJmsConnectionFactory JMS connection factory.
* @return JMS template instance.
*/
@Bean
@Autowired
@Scope(value=BeanDefinition.SCOPE_PROTOTYPE)
public JmsTemplate jmsTemplate(@Qualifier("pooledJmsConnectionFactory") ConnectionFactory pooledJmsConnectionFactory) {
return new JmsTemplate(pooledJmsConnectionFactory);
}
}
广播公司。java
然后我在广播类中注入以前创建的JmsTemplate
。
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class ConcreteBroadcaster implements Broadcaster {
private Logger log = LoggerFactory.getLogger(ConcreteBroadcaster.class);
@Autowired
private JmsTemplate template;
private final Destination destination;
public ConcreteBroadcaster(Destination destination) {
this.destination = destination;
}
/**
* Broadcasts event.
*/
@Override
public void broadcast(final Event event) {
template.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
final Message eventMessage = session.createObjectMessage(event);
if (log.isDebugEnabled()) {
log.debug("Broadcasting event {} through JMS topic {}", event.getClass().getSimpleName(), destination);
}
return eventMessage;
}
});
}
}
当只有一个ActiveMQ代理时,一切都很好。当我将我的jms.url
从:
jms.url:tcp://10.0.0.1:61616
到:
jms。url:“故障转移:(tcp://10.0.0.1:61616,tcp://10.0.0.2:61616)?randomize=false“
并在给定主机上以默认配置运行两个ActiveMQ代理,我的应用程序将启动并向我发送以下日志:
16 Feb 2015 16:37:40,491 INFO ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 16:37:41,499 INFO ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 16:37:42,508 INFO ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 16:37:43,514 INFO ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 16:37:44,522 INFO ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
因此,似乎每秒都有一次重新连接。当我启用调试日志记录时,它如下所示:
16 Feb 2015 19:46:25,050 DEBUG ache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://10.0.0.1:61616, tcp://10.0.0.2:61616], from: [tcp://10.0.0.1:61616, tcp://10.0.0.2:61616]
16 Feb 2015 19:46:25,051 DEBUG ache.activemq.transport.failover.FailoverTransport - Attempting 0th connect to: tcp://10.0.0.1:61616
16 Feb 2015 19:46:25,053 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=9, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:25,053 DEBUG ache.activemq.transport.failover.FailoverTransport - Connection established
16 Feb 2015 19:46:25,053 INFO ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 19:46:25,056 DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=9, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=5, properties={CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:25,056 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=5, properties={CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:25,056 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40113 before negotiation: OpenWireFormat{version=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
16 Feb 2015 19:46:25,057 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40113 after negotiation: OpenWireFormat{version=5, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
16 Feb 2015 19:46:25,064 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@46abe3f3[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:26,065 DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dev.vagrant.local-33222-1424101620085-1:6862:1:1, lastDeliveredSequenceId:0
16 Feb 2015 19:46:26,066 DEBUG org.apache.activemq.ActiveMQSession - ID:dev.vagrant.local-33222-1424101620085-1:6862:1 Transaction Commit :null
16 Feb 2015 19:46:26,068 DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@46abe3f3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:26,069 DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@228b3f4c[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:26,069 DEBUG ache.activemq.transport.failover.FailoverTransport - Stopped tcp://10.0.0.1:61616
16 Feb 2015 19:46:26,070 DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@565b26a9[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
16 Feb 2015 19:46:26,071 DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp:///10.0.0.1:61616@40113
16 Feb 2015 19:46:26,071 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@74e8dd0c[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:26,072 DEBUG org.apache.activemq.transport.tcp.TcpTransport$1 - Closed socket Socket[addr=/10.0.0.1,port=61616,localport=40113]
16 Feb 2015 19:46:26,072 DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@74e8dd0c[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:26,073 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@3a0a1c78[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:26,074 DEBUG ache.activemq.transport.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.
16 Feb 2015 19:46:26,074 DEBUG ache.activemq.transport.failover.FailoverTransport - Started unconnected
16 Feb 2015 19:46:26,074 DEBUG ache.activemq.transport.failover.FailoverTransport - Waking up reconnect task
16 Feb 2015 19:46:26,075 DEBUG ache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://10.0.0.1:61616, tcp://10.0.0.2:61616], from: [tcp://10.0.0.1:61616, tcp://10.0.0.2:61616]
16 Feb 2015 19:46:26,076 DEBUG ache.activemq.transport.failover.FailoverTransport - Attempting 0th connect to: tcp://10.0.0.1:61616
16 Feb 2015 19:46:26,077 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=9, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:26,077 DEBUG ache.activemq.transport.failover.FailoverTransport - Connection established
16 Feb 2015 19:46:26,077 INFO ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 19:46:26,082 DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=9, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=5, properties={CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:26,083 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=5, properties={CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:26,083 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40114 before negotiation: OpenWireFormat{version=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
16 Feb 2015 19:46:26,083 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40114 after negotiation: OpenWireFormat{version=5, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
16 Feb 2015 19:46:26,093 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@6d0c65b0[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:27,094 DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dev.vagrant.local-33222-1424101620085-1:6863:1:1, lastDeliveredSequenceId:0
16 Feb 2015 19:46:27,096 DEBUG org.apache.activemq.ActiveMQSession - ID:dev.vagrant.local-33222-1424101620085-1:6863:1 Transaction Commit :null
16 Feb 2015 19:46:27,098 DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6d0c65b0[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:27,098 DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@76d73f47[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:27,099 DEBUG ache.activemq.transport.failover.FailoverTransport - Stopped tcp://10.0.0.1:61616
16 Feb 2015 19:46:27,100 DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp:///10.0.0.1:61616@40114
16 Feb 2015 19:46:27,100 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@1ffc55fd[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:27,101 DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@3a0a1c78[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
16 Feb 2015 19:46:27,102 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5a12ff9c[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:27,103 DEBUG ache.activemq.transport.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.
16 Feb 2015 19:46:27,103 DEBUG ache.activemq.transport.failover.FailoverTransport - Started unconnected
16 Feb 2015 19:46:27,104 DEBUG ache.activemq.transport.failover.FailoverTransport - Waking up reconnect task
16 Feb 2015 19:46:27,103 DEBUG org.apache.activemq.transport.tcp.TcpTransport$1 - Closed socket Socket[addr=/10.0.0.1,port=61616,localport=40114]
16 Feb 2015 19:46:27,104 DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@1ffc55fd[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
16 Feb 2015 19:46:27,105 WARN ache.activemq.transport.failover.FailoverTransport - Transport (tcp://10.0.0.1:61616) failed, reason: java.io.EOFException, not attempting to automatically reconnect
16 Feb 2015 19:46:27,105 DEBUG org.apache.activemq.ActiveMQConnection - transport interrupted, dispatchers: 0
16 Feb 2015 19:46:27,105 DEBUG org.apache.activemq.ActiveMQConnection - notified failover transport (unconnected) of pending interruption processing for: ID:dev.vagrant.local-33222-1424101620085-1:6863
16 Feb 2015 19:46:27,106 DEBUG ache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://10.0.0.1:61616, tcp://10.0.0.2:61616], from: [tcp://10.0.0.1:61616, tcp://10.0.0.2:61616]
16 Feb 2015 19:46:27,107 DEBUG ache.activemq.transport.failover.FailoverTransport - Attempting 0th connect to: tcp://10.0.0.1:61616
16 Feb 2015 19:46:27,109 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=9, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:27,109 DEBUG ache.activemq.transport.failover.FailoverTransport - Connection established
16 Feb 2015 19:46:27,109 INFO ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 19:46:27,113 DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=9, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=5, properties={CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:27,114 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=5, properties={CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:27,114 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40115 before negotiation: OpenWireFormat{version=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
16 Feb 2015 19:46:27,114 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40115 after negotiation: OpenWireFormat{version=5, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
16 Feb 2015 19:46:27,121 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@1cf53b77[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:28,123 DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dev.vagrant.local-33222-1424101620085-1:6864:1:1, lastDeliveredSequenceId:0
16 Feb 2015 19:46:28,124 DEBUG org.apache.activemq.ActiveMQSession - ID:dev.vagrant.local-33222-1424101620085-1:6864:1 Transaction Commit :null
16 Feb 2015 19:46:28,126 DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@1cf53b77[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:28,127 DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@39d64d12[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:28,128 DEBUG ache.activemq.transport.failover.FailoverTransport - Stopped tcp://10.0.0.1:61616
16 Feb 2015 19:46:28,128 DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5a12ff9c[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
16 Feb 2015 19:46:28,129 DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp:///10.0.0.1:61616@40115
16 Feb 2015 19:46:28,130 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@372d510a[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:28,130 DEBUG org.apache.activemq.transport.tcp.TcpTransport$1 - Closed socket Socket[addr=/10.0.0.1,port=61616,localport=40115]
16 Feb 2015 19:46:28,131 DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@372d510a[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:28,131 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@3cbab1f9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:28,131 DEBUG ache.activemq.transport.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.
16 Feb 2015 19:46:28,132 DEBUG ache.activemq.transport.failover.FailoverTransport - Started unconnected
16 Feb 2015 19:46:28,133 DEBUG ache.activemq.transport.failover.FailoverTransport - Waking up reconnect task
16 Feb 2015 19:46:28,133 DEBUG ache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://10.0.0.1:61616, tcp://10.0.0.2:61616], from: [tcp://10.0.0.1:61616, tcp://10.0.0.2:61616]
16 Feb 2015 19:46:28,134 DEBUG ache.activemq.transport.failover.FailoverTransport - Attempting 0th connect to: tcp://10.0.0.1:61616
16 Feb 2015 19:46:28,136 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=9, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:28,136 DEBUG ache.activemq.transport.failover.FailoverTransport - Connection established
16 Feb 2015 19:46:28,136 INFO ache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://10.0.0.1:61616
16 Feb 2015 19:46:28,142 DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=9, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=5, properties={CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:28,144 DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=5, properties={CacheSize=1024, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, CacheEnabled=true, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
16 Feb 2015 19:46:28,144 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40116 before negotiation: OpenWireFormat{version=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
16 Feb 2015 19:46:28,144 DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp:///10.0.0.1:61616@40116 after negotiation: OpenWireFormat{version=5, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
16 Feb 2015 19:46:28,155 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5e95c519[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:28,394 DEBUG che.activemq.transport.AbstractInactivityMonitor$2 - WriteChecker 10000 ms elapsed since last write check.
16 Feb 2015 19:46:29,156 DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dev.vagrant.local-33222-1424101620085-1:6865:1:1, lastDeliveredSequenceId:0
16 Feb 2015 19:46:29,157 DEBUG org.apache.activemq.ActiveMQSession - ID:dev.vagrant.local-33222-1424101620085-1:6865:1 Transaction Commit :null
16 Feb 2015 19:46:29,160 DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5e95c519[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:29,160 DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5af5d17a[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
16 Feb 2015 19:46:29,160 DEBUG ache.activemq.transport.failover.FailoverTransport - Stopped tcp://10.0.0.1:61616
16 Feb 2015 19:46:29,162 DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp:///10.0.0.1:61616@40116
16 Feb 2015 19:46:29,162 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5b20212e[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:29,163 DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@3cbab1f9[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
16 Feb 2015 19:46:29,164 DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@6331ee[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
16 Feb 2015 19:46:29,164 DEBUG ache.activemq.transport.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.
16 Feb 2015 19:46:29,165 DEBUG ache.activemq.transport.failover.FailoverTransport - Started unconnected
16 Feb 2015 19:46:29,165 DEBUG ache.activemq.transport.failover.FailoverTransport - Waking up reconnect task
16 Feb 2015 19:46:29,165 DEBUG org.apache.activemq.transport.tcp.TcpTransport$1 - Closed socket Socket[addr=/10.0.0.1,port=61616,localport=40116]
16 Feb 2015 19:46:29,166 DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5b20212e[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
16 Feb 2015 19:46:29,166 WARN ache.activemq.transport.failover.FailoverTransport - Transport (tcp://10.0.0.1:61616) failed, reason: java.io.EOFException, not attempting to automatically reconnect
16 Feb 2015 19:46:29,167 DEBUG org.apache.activemq.ActiveMQConnection - transport interrupted, dispatchers: 0
16 Feb 2015 19:46:29,167 DEBUG org.apache.activemq.ActiveMQConnection - notified failover transport (unconnected) of pending interruption processing for: ID:dev.vagrant.local-33222-1424101620085-1:6865
16 Feb 2015 19:46:29,167 DEBUG ache.activemq.transport.failover.FailoverTransport - urlList connectionList:[tcp://10.0.0.1:61616, tcp://10.0.0.2:61616], from: [tcp://10.0.0.1:61616, tcp://10.0.0.2:61616]
我不知道怎么了,请帮忙。
看起来有什么东西在1秒钟后关闭了连接;打开跟踪级别日志记录;池连接工厂在启用该功能的情况下输出更多日志。
或者,尝试使用Spring的CachingConnectionFactory
而不是PooledConnectionFactory
。
我们使用MQ作为传递消息的主要路径。这是我们的制度运作不可或缺的一部分。消息代理有时会失败,所有相关的队列也会随之失败。在camel中,有没有一种方法可以启动故障切换,并在其启动时恢复到主故障切换?
因此,如果我理解正确的话,在检测并重新启动失败代理的环境中运行Artemis代理集群将提供与运行每个活动服务器都与备份配对的集群相同的语义(以及类似的可用性)。对吗?
我注意到,当连接的Artemis节点宕机时,连接到节点2-4的客户机不会故障转移到其他3个可用的主节点,基本上不会发现其他节点。即使在原始节点恢复之后,客户端仍然无法建立连接。我从一个单独的堆栈溢出帖子中看到,不支持主到主故障转移。这是否意味着对于每个主节点,我也需要创建一个从节点来处理故障转移?这是否会导致两个实例点失败,而不是集群中有许多节点? 在一个单独的基本测试中,使用一个主从两个节点的集
目前,我正在使用ActiveMQ,并计划将系统迁移到ActiveMQ Artemis。目前,我有3个生产者和3个消费者,只有一个ActiveMQ服务器/代理。
我正在开发一个使用activemq交换消息的应用程序,有些消息太大了,我想取消。 我们使用两个activemq实例(主/从)的activemq故障转移传输。代理本身对消息有100mb的帧大小限制。 问题是:如果我尝试发送大于100mb的消息,ActiveMQ服务器将关闭连接。此时,故障转移传输将尝试重新连接并再次发送消息,从而创建无限循环。 客户端记录以下内容: 当activeMQ实例记录时: 我
我有两个ActiveMQ Artemis服务器(server1和server2)。两者都是主人,在这种情况下没有奴隶。Artemis支持主对主故障转移吗?如果是,任何一个可以提供代理配置。目前,我已经在两个服务器的文件中定义了以下配置。 此外,如果可能的话,您是否可以提供示例客户端代码以测试主到主故障转移场景?