参考地址:https://www.rabbitmq.com/api-guide.html
版本: rabbitmq-amqp-client:5.1.2
// 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置rabbitmq 服务端所在地址 我这里在本地就是本地
connectionFactory.setHost("127.0.0.1");
// 设置端口号,连接用户名,虚拟地址等
connectionFactory.setPort(5672);
connectionFactory.setUsername("jimmy");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
return connectionFactory.newConnection();
ConnectionFactory
Connection
从connectionFactory.newConnection()
分析示例中代码的涉及到创建链接的主要节点
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
conn.init();
->
this.cf.newConnection()
->
FrameHandler frameHandler = factory.create(addr, connectionName());
->
RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
conn.start();
涉及代码量还是比较多的。这里为了方便分析需要先了解下 ConnectionFactory
和Connection
的源码(当然不是一下子都能掌握,先找几个后面能用到的浪)。
// 几个基本信息
private String username = DEFAULT_USER;
private String password = DEFAULT_PASS;
private String virtualHost = DEFAULT_VHOST;
private String host = DEFAULT_HOST;
private int port = USE_DEFAULT_PORT;
// 一些超时、最大值限制
private int requestedChannelMax = DEFAULT_CHANNEL_MAX;
private int requestedFrameMax = DEFAULT_FRAME_MAX;
private int requestedHeartbeat = DEFAULT_HEARTBEAT;
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
private int handshakeTimeout = DEFAULT_HANDSHAKE_TIMEOUT;
private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
private SocketFactory socketFactory = null;
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
private ExecutorService sharedExecutor;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
private ExecutorService shutdownExecutor;
// 心跳线程池
private ScheduledExecutorService heartbeatExecutor;
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
// 是否自动重连
private boolean automaticRecovery = true;
private boolean topologyRecovery = true;
private long networkRecoveryInterval = DEFAULT_NETWORK_RECOVERY_INTERVAL;
private RecoveryDelayHandler recoveryDelayHandler;
// 提供的扩展点,提供了部分节点的时候的处理方法。
private MetricsCollector metricsCollector;
// 是否采用nio模式
private boolean nio = false;
// frameHandler工厂
private FrameHandlerFactory frameHandlerFactory;
private NioParams nioParams = new NioParams();
// 安全工厂
private SslContextFactory sslContextFactory;
private int channelRpcTimeout = DEFAULT_CHANNEL_RPC_TIMEOUT;
private boolean channelShouldCheckRpcResponseType = false;
connection
实例对象)// 重要的属性,具体怎么叫等我掌握整体流程的时候或许可以稍微取一个名字
private final RecoveryAwareAMQConnectionFactory cf;
// con的channel集合
private final Map<Integer, AutorecoveringChannel> channels;
// 链接信息
private final ConnectionParams params;
private volatile RecoveryAwareAMQConnection delegate;
private final List<ShutdownListener> shutdownHooks = Collections.synchronizedList(new ArrayList<ShutdownListener>());
private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<RecoveryListener>());
private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<BlockedListener>());
// Records topology changes
private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<String, RecordedQueue>());
private final List<RecordedBinding> recordedBindings = Collections.synchronizedList(new ArrayList<RecordedBinding>());
private final Map<String, RecordedExchange> recordedExchanges = Collections.synchronizedMap(new LinkedHashMap<String, RecordedExchange>());
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<String, RecordedConsumer>());
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<ConsumerRecoveryListener>());
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<QueueRecoveryListener>());
// Used to block connection recovery attempts after close() is invoked.
private volatile boolean manuallyClosed = false;
// This lock guards the manuallyClosed flag and the delegate connection. Guarding these two ensures that a new connection can never
// be created after application code has initiated shutdown.
private final Object recoveryLock = new Object();
private final ConnectionParams params;
private final FrameHandlerFactory factory;
private final AddressResolver addressResolver;
private final MetricsCollector metricsCollector;
connectionFactory.newConnection()
跟进去之后最后调用的方式connectionFactory.newConnection( executor, addressResolver, clientProvidedName)
。
首先这个方法是要创建一个connection
即获取一个connection
实例。而获取这个实例是要从工厂类RecoveryAwareAMQConnectionFactory
中获取的。就好比现在我们跟神龙许愿一样,要是想要神龙满足我们的愿望,就要凑齐七颗龙珠(❤李)。那这条龙的龙珠是什么呢?这就要找到RecoveryAwareAMQConnectionFactory
生成实例的方法查看下如何创建的(其实还是点进去看而已)。
下面就是创建实例的方法RecoveryAwareAMQConnectionFactory.createConnection(ConnectionParams,FrameHandler,MetricsCollector )
(太长了凑合看,主要看参数就好)。
那这里的七龙珠就分别是 ConnectionParams
FrameHandler
MetricsCollector
那这回我们回到connectionFactory.newConnection( executor, addressResolver, clientProvidedName)
,翠花!上酸菜。(这里挑这次要分析的酸菜来,其他的在说).
...
if(this.metricsCollector == null) {
this.metricsCollector = new NoOpMetricsCollector();
}
// make sure we respect the provided thread factory
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
ConnectionParams params = params(executor);
// set client-provided via a client property
if (clientProvidedName != null) {
Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
properties.put("connection_name", clientProvidedName);
params.setClientProperties(properties);
}
if (isAutomaticRecoveryEnabled()) {
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
conn.init();
return conn;
} else {
...
}
}
很显然,前几行代码就是在准备龙珠:
// 七龙珠1号 MetricsCollector
if(this.metricsCollector == null) {
this.metricsCollector = new NoOpMetricsCollector();
}
// 七龙珠2号生产厂家 FrameHandlerFactory
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
// 七龙珠3号 ConnectionParams
ConnectionParams params = params(executor);
// set client-provided via a client property
if (clientProvidedName != null) {
Map<String, Object> properties = new HashMap<String,Object(params.getClientProperties());
properties.put("connection_name", clientProvidedName);
params.setClientProperties(properties);
}
接下来就是七龙珠合体了。
// 组装神龙
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
// 许愿,给我一个connection吧
conn.init();
那来看下rabbotmq
是如何组装神龙和许愿的
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, AddressResolver addressResolver, MetricsCollector metricsCollector) {
// 生合成conn的工厂(愿望生成器)
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addressResolver, metricsCollector);
// 属性参数(密码、账号之类的东西)
this.params = params;
// conn的 channel集合
this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();
}
/**
* Private API.
* @throws IOException
* @see com.rabbitmq.client.ConnectionFactory#newConnection(java.util.concurrent.ExecutorService)
*/
public void init() throws IOException, TimeoutException {
// 使用愿望生成器获取愿望
this.delegate = this.cf.newConnection();
// 自动重连监听器 神龙掉线了就不能实现愿望了,所以要监听它⏱
this.addAutomaticRecoveryListener(delegate);
}
来看看愿望生成器是如何生成愿望的,生成的时候都干了啥
public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
Exception lastException = null;
List<Address> shuffled = shuffle(addressResolver.getAddresses());
for (Address addr : shuffled) {
try {
// 获取frameHandler(通过七龙珠2号生成器生成七龙珠2号)
// 这里面的源码我不展开读取了,返回了一个包装socket的SocketFrameHandler对象
FrameHandler frameHandler = factory.create(addr, connectionName());
// 创建conn (创建愿望)
RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
// 启动conn (实现愿望)
conn.start();
// 扩展点
metricsCollector.newConnection(conn);
return conn;
} catch (IOException e) {
lastException = e;
} catch (TimeoutException te) {
lastException = te;
}
}
if (lastException != null) {
if (lastException instanceof IOException) {
throw (IOException) lastException;
} else if (lastException instanceof TimeoutException) {
throw (TimeoutException) lastException;
}
}
throw new IOException("failed to connect");
}
最后在来看看conn的启动(太长了,不想看)
public void start() throws IOException, TimeoutException {
// 初始化消费服务,用于处理channel
initializeConsumerWorkService();
// 初始化心跳
initializeHeartbeatSender();
this._running = true;
// Make sure that the first thing we do is to send the header,
// which should cause any socket errors to show up for us, rather
// than risking them pop out in the MainLoop
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
new AMQChannel.SimpleBlockingRpcContinuation();
// We enqueue an RPC continuation here without sending an RPC
// request, since the protocol specifies that after sending
// the version negotiation header, the client (connection
// initiator) is to wait for a connection.start method to
// arrive.
// 将rpc任务放入channel(阻塞),最后放入工作线程
_channel0.enqueueRpc(connStartBlocker);
try {
// The following two lines are akin to AMQChannel's
// transmit() method for this pseudo-RPC.
// 设置超时时间发送header,这里是将一些数据写入到outputStream
_frameHandler.setTimeout(handshakeTimeout);
_frameHandler.sendHeader();
} catch (IOException ioe) {
_frameHandler.close();
throw ioe;
}
// 开启主线程 MainLoop
this._frameHandler.initialize(this);
AMQP.Connection.Start connStart;
AMQP.Connection.Tune connTune = null;
try {
// Connection.Start
connStart = (AMQP.Connection.Start)connStartBlocker.getReply(handshakeTimeout/2).getMethod();
// 服务器信息
_serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());
Version serverVersion =
new Version(connStart.getVersionMajor(),connStart.getVersionMinor());
if (!Version.checkVersion(clientVersion, serverVersion)) {
throw new ProtocolVersionMismatchException(clientVersion,serverVersion);
}
String[] mechanisms = connStart.getMechanisms().toString().split(" ");
SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
if (sm == null) {
throw new IOException("No compatible authentication mechanism found - " +
"server offered [" + connStart.getMechanisms() + "]");
}
LongString challenge = null;
LongString response = sm.handleChallenge(null, this.username, this.password);
do {
Method method = (challenge == null)
? new AMQP.Connection.StartOk.Builder()
.clientProperties(_clientProperties)
.mechanism(sm.getName())
.response(response)
.build()
: new AMQP.Connection.SecureOk.Builder().response(response).build();
try {
// Connection.Tune
Method serverResponse =
_channel0.rpc(method, handshakeTimeout/2).getMethod();
if (serverResponse instanceof AMQP.Connection.Tune) {
connTune = (AMQP.Connection.Tune) serverResponse;
} else {
challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
response = sm.handleChallenge(challenge, this.username,this.password);
}
} catch (ShutdownSignalException e) {
Method shutdownMethod = e.getReason();
if (shutdownMethod instanceof AMQP.Connection.Close) {
AMQP.Connection.Close shutdownClose =
(AMQP.Connection.Close) shutdownMethod;
if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
throw new AuthenticationFailureException(shutdownClose.getReplyText());
}
}
throw new PossibleAuthenticationFailureException(e);
}
} while (connTune == null);
} catch (TimeoutException te) {
_frameHandler.close();
throw te;
} catch (ShutdownSignalException sse) {
_frameHandler.close();
throw AMQChannel.wrap(sse);
} catch(IOException ioe) {
_frameHandler.close();
throw ioe;
}
try {
// channelMax
int channelMax =
negotiateChannelMax(this.requestedChannelMax,connTune.getChannelMax());
// ChannelManager
_channelManager = instantiateChannelManager(channelMax, threadFactory);
// frameMax
int frameMax =
negotiatedMaxValue(this.requestedFrameMax,connTune.getFrameMax());
this._frameMax = frameMax;
int heartbeat =
negotiatedMaxValue(this.requestedHeartbeat,connTune.getHeartbeat());
// 启动心跳
setHeartbeat(heartbeat);
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
.channelMax(channelMax)
.frameMax(frameMax)
.heartbeat(heartbeat)
.build());
_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
.virtualHost(_virtualHost)
.build());
} catch (IOException ioe) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw ioe;
} catch (ShutdownSignalException sse) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw AMQChannel.wrap(sse);
}
// We can now respond to errors having finished tailoring the connection
this._inConnectionNegotiation = false;
}
接下来在看下mainLoop
线程中做了啥。
public void run() {
try {
while (_running) {
Frame frame = _frameHandler.readFrame();
// 处理帧
readFrame(frame);
}
} catch (Throwable ex) {
handleFailure(ex);
} finally {
doFinalShutdown();
}
}
// readFrame(frame)内容
private void readFrame(Frame frame) throws IOException {
if (frame != null) {
_missedHeartbeats = 0;
// 如果是心跳帧则忽略
if (frame.type == AMQP.FRAME_HEARTBEAT) {
// Ignore it: we've already just reset the heartbeat counter.
} else {
if (frame.channel == 0) { // the special channel
// 特殊帧
_channel0.handleFrame(frame);
} else {
if (isOpen()) {
// If we're still _running, but not isOpen(), then we
// must be quiescing, which means any inbound frames
// for non-zero channels (and any inbound commands on
// channel zero that aren't Connection.CloseOk) must
// be discarded.
ChannelManager cm = _channelManager;
if (cm != null) {
ChannelN channel;
try {
channel = cm.getChannel(frame.channel);
} catch(UnknownChannelException e) {
// this can happen if channel has been closed,
// but there was e.g. an in-flight delivery.
// just ignoring the frame to avoid closing the whole connection
LOGGER.info("Received a frame on an unknown channel, ignoring it");
return;
}
// 处理
channel.handleFrame(frame);
}
}
}
}
} else {
// Socket timeout waiting for a frame.
// Maybe missed heartbeat.
handleSocketTimeout();
}
}
// channel.handleFrame 后面在进行分析
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
_command = new AMQCommand(); // prepare for the next one
handleCompleteInboundCommand(command);
}
}
总结:
connectionFactory.newConnection()
- AutorecoveringConnection.init()
- RecoveryAwareAMQConnectionFactory.newConnection()
- RecoveryAwareAMQConnection.start()
步骤描述:
connectionFactory
组装FrameHandlerFactory
、 MetricsCollector
、 ConnectionParams
构造AutorecoveringConnection
实例这个实例中持有FrameHandlerFactory
ConnectionParams
(这个参数能构造RecoveryAwareAMQConnectionFactory
实例cf
)
metricsCollector
以及连接地址信息。
通过AutorecoveringConnection.init()
来初始化A-conn
(这里用A-表示是AutorecoveringConnection,下同)
A-conn
持有的cf
来创建 RecoveryAwareAMQConnection
实例delegate
FrameHandler
ConnectionParams
、操作句柄FrameHandler
、 扩展组件metricsCollector
传入RecoveryAwareAMQConnection
的构造方法获取相应实例R-conn
R-conn.start()
启动链接然后返回后记:读起来还是比较费劲的,这个源码估计以后我还得在看2遍才能吃透一部分。目前就是先梳理一部分把,肯定会有遗漏。