当前位置: 首页 > 工具软件 > rabbit-ex > 使用案例 >

MQ!Rabbit-client建立链接源码分析

易炳
2023-12-01

MQ!Rabbit-client建立链接源码分析

参考地址:https://www.rabbitmq.com/api-guide.html


版本: rabbitmq-amqp-client:5.1.2

创建链接示例代码:

// 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置rabbitmq 服务端所在地址 我这里在本地就是本地
connectionFactory.setHost("127.0.0.1");
// 设置端口号,连接用户名,虚拟地址等
connectionFactory.setPort(5672);
connectionFactory.setUsername("jimmy");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
return connectionFactory.newConnection();

源码分析

  • ConnectionFactory
  • Connection

准备工作

connectionFactory.newConnection()分析示例中代码的涉及到创建链接的主要节点

AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
conn.init();
-> 
this.cf.newConnection()
->
FrameHandler frameHandler = factory.create(addr, connectionName());
-> 
RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
conn.start();

涉及代码量还是比较多的。这里为了方便分析需要先了解下 ConnectionFactoryConnection的源码(当然不是一下子都能掌握,先找几个后面能用到的浪)。

  • ConnectionFactory
// 几个基本信息 
private String username                       = DEFAULT_USER;
private String password                       = DEFAULT_PASS;
private String virtualHost                    = DEFAULT_VHOST;
private String host                           = DEFAULT_HOST;
private int port                              = USE_DEFAULT_PORT;

// 一些超时、最大值限制
private int requestedChannelMax               = DEFAULT_CHANNEL_MAX;
private int requestedFrameMax                 = DEFAULT_FRAME_MAX;
private int requestedHeartbeat                = DEFAULT_HEARTBEAT;
private int connectionTimeout                 = DEFAULT_CONNECTION_TIMEOUT;
private int handshakeTimeout                  = DEFAULT_HANDSHAKE_TIMEOUT;
private int shutdownTimeout                   = DEFAULT_SHUTDOWN_TIMEOUT;


private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
private SocketFactory socketFactory           = null;
private SaslConfig saslConfig                 = DefaultSaslConfig.PLAIN;
private ExecutorService sharedExecutor;
private ThreadFactory threadFactory           = Executors.defaultThreadFactory();
private ExecutorService shutdownExecutor;
// 心跳线程池
private ScheduledExecutorService heartbeatExecutor;
private SocketConfigurator socketConf         = new DefaultSocketConfigurator();
private ExceptionHandler exceptionHandler     = new DefaultExceptionHandler();
// 是否自动重连
private boolean automaticRecovery             = true;

private boolean topologyRecovery              = true;
private long networkRecoveryInterval          = DEFAULT_NETWORK_RECOVERY_INTERVAL;
private RecoveryDelayHandler recoveryDelayHandler;

// 提供的扩展点,提供了部分节点的时候的处理方法。
private MetricsCollector metricsCollector;

// 是否采用nio模式
private boolean nio = false;

// frameHandler工厂
private FrameHandlerFactory frameHandlerFactory;

private NioParams nioParams = new NioParams();

// 安全工厂
private SslContextFactory sslContextFactory;
private int channelRpcTimeout = DEFAULT_CHANNEL_RPC_TIMEOUT;
private boolean channelShouldCheckRpcResponseType = false;
  • AutorecoveringConnection(示例中的代码后续创建的connection实例对象)
// 重要的属性,具体怎么叫等我掌握整体流程的时候或许可以稍微取一个名字
private final RecoveryAwareAMQConnectionFactory cf;

// con的channel集合
private final Map<Integer, AutorecoveringChannel> channels;
// 链接信息
private final ConnectionParams params;
private volatile RecoveryAwareAMQConnection delegate;

private final List<ShutdownListener> shutdownHooks  = Collections.synchronizedList(new ArrayList<ShutdownListener>());
private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<RecoveryListener>());
private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<BlockedListener>());

// Records topology changes
private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<String, RecordedQueue>());
private final List<RecordedBinding> recordedBindings = Collections.synchronizedList(new ArrayList<RecordedBinding>());
private final Map<String, RecordedExchange> recordedExchanges = Collections.synchronizedMap(new LinkedHashMap<String, RecordedExchange>());
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<String, RecordedConsumer>());
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<ConsumerRecoveryListener>());
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<QueueRecoveryListener>());

// Used to block connection recovery attempts after close() is invoked.
private volatile boolean manuallyClosed = false;

// This lock guards the manuallyClosed flag and the delegate connection.  Guarding these two ensures that a new connection can never
// be created after application code has initiated shutdown.  
private final Object recoveryLock = new Object();
  • RecoveryAwareAMQConnectionFactory(比较重要所以拿出来看下里面有什么东东)
private final ConnectionParams params;
private final FrameHandlerFactory factory;
private final AddressResolver addressResolver;
private final MetricsCollector metricsCollector;

开始分析

connectionFactory.newConnection() 跟进去之后最后调用的方式connectionFactory.newConnection( executor, addressResolver, clientProvidedName)

首先这个方法是要创建一个connection即获取一个connection实例。而获取这个实例是要从工厂类RecoveryAwareAMQConnectionFactory中获取的。就好比现在我们跟神龙许愿一样,要是想要神龙满足我们的愿望,就要凑齐七颗龙珠(❤李)。那这条龙的龙珠是什么呢?这就要找到RecoveryAwareAMQConnectionFactory生成实例的方法查看下如何创建的(其实还是点进去看而已)。

下面就是创建实例的方法RecoveryAwareAMQConnectionFactory.createConnection(ConnectionParams,FrameHandler,MetricsCollector ) (太长了凑合看,主要看参数就好)。

那这里的七龙珠就分别是 ConnectionParams FrameHandler MetricsCollector

那这回我们回到connectionFactory.newConnection( executor, addressResolver, clientProvidedName),翠花!上酸菜。(这里挑这次要分析的酸菜来,其他的在说).

...
        if(this.metricsCollector == null) {
            this.metricsCollector = new NoOpMetricsCollector();
        }
        // make sure we respect the provided thread factory
        FrameHandlerFactory fhFactory = createFrameHandlerFactory();
        ConnectionParams params = params(executor);
        // set client-provided via a client property
        if (clientProvidedName != null) {
            Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
            properties.put("connection_name", clientProvidedName);
            params.setClientProperties(properties);
        }

        if (isAutomaticRecoveryEnabled()) {
            // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
            AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);

            conn.init();
            return conn;
        } else {
            ...
        }
    }

很显然,前几行代码就是在准备龙珠:

// 七龙珠1号 MetricsCollector
if(this.metricsCollector == null) {
    this.metricsCollector = new NoOpMetricsCollector();
}
// 七龙珠2号生产厂家 FrameHandlerFactory
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
// 七龙珠3号 ConnectionParams
ConnectionParams params = params(executor);
// set client-provided via a client property
if (clientProvidedName != null) {
    Map<String, Object> properties = new                     HashMap<String,Object(params.getClientProperties());
    properties.put("connection_name", clientProvidedName);
    params.setClientProperties(properties);
}

接下来就是七龙珠合体了。

// 组装神龙  
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, 	addressResolver, metricsCollector);
// 许愿,给我一个connection吧
conn.init();

那来看下rabbotmq是如何组装神龙和许愿的

public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, AddressResolver addressResolver, MetricsCollector metricsCollector) {
		// 生合成conn的工厂(愿望生成器)
        this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addressResolver, metricsCollector);
    	// 属性参数(密码、账号之类的东西)
        this.params = params;
    	// conn的 channel集合
        this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();
}

/**
  * Private API.
  * @throws IOException
  * @see 	com.rabbitmq.client.ConnectionFactory#newConnection(java.util.concurrent.ExecutorService)
  */
public void init() throws IOException, TimeoutException {
    // 使用愿望生成器获取愿望
    this.delegate = this.cf.newConnection();
    // 自动重连监听器  神龙掉线了就不能实现愿望了,所以要监听它⏱
    this.addAutomaticRecoveryListener(delegate);
}

来看看愿望生成器是如何生成愿望的,生成的时候都干了啥

public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
        Exception lastException = null;
        List<Address> shuffled = shuffle(addressResolver.getAddresses());

        for (Address addr : shuffled) {
            try {
                // 获取frameHandler(通过七龙珠2号生成器生成七龙珠2号)
                // 这里面的源码我不展开读取了,返回了一个包装socket的SocketFrameHandler对象
                FrameHandler frameHandler = factory.create(addr, connectionName());
                // 创建conn (创建愿望)
                RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
                // 启动conn (实现愿望)
                conn.start();
                // 扩展点
                metricsCollector.newConnection(conn);
                return conn;
            } catch (IOException e) {
                lastException = e;
            } catch (TimeoutException te) {
                lastException = te;
            }
        }

        if (lastException != null) {
            if (lastException instanceof IOException) {
                throw (IOException) lastException;
            } else if (lastException instanceof TimeoutException) {
                throw (TimeoutException) lastException;
            }
        }
        throw new IOException("failed to connect");
    }

最后在来看看conn的启动(太长了,不想看)

public void start() throws IOException, TimeoutException {
    // 初始化消费服务,用于处理channel
	initializeConsumerWorkService();
    // 初始化心跳
	initializeHeartbeatSender();
	this._running = true;
	// Make sure that the first thing we do is to send the header,
	// which should cause any socket errors to show up for us, rather
	// than risking them pop out in the MainLoop
	AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
		new AMQChannel.SimpleBlockingRpcContinuation();
	// We enqueue an RPC continuation here without sending an RPC
	// request, since the protocol specifies that after sending
	// the version negotiation header, the client (connection
	// initiator) is to wait for a connection.start method to
	// arrive.
    // 将rpc任务放入channel(阻塞),最后放入工作线程
	_channel0.enqueueRpc(connStartBlocker);
	try {
		// The following two lines are akin to AMQChannel's
		// transmit() method for this pseudo-RPC.
        // 设置超时时间发送header,这里是将一些数据写入到outputStream
		_frameHandler.setTimeout(handshakeTimeout);
		_frameHandler.sendHeader();
	} catch (IOException ioe) {
		_frameHandler.close();
		throw ioe;
	}
	// 开启主线程 MainLoop
	this._frameHandler.initialize(this);

	AMQP.Connection.Start connStart;
	AMQP.Connection.Tune connTune = null;
	try {
        // Connection.Start
		connStart = (AMQP.Connection.Start)connStartBlocker.getReply(handshakeTimeout/2).getMethod();
        // 服务器信息
        _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());
		Version serverVersion =
				new Version(connStart.getVersionMajor(),connStart.getVersionMinor());

		if (!Version.checkVersion(clientVersion, serverVersion)) {
			throw new ProtocolVersionMismatchException(clientVersion,serverVersion);
		}

		String[] mechanisms = connStart.getMechanisms().toString().split(" ");
		SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
		if (sm == null) {
			throw new IOException("No compatible authentication mechanism found - " +
										  "server offered [" + connStart.getMechanisms() + "]");
		}

		LongString challenge = null;
		LongString response = sm.handleChallenge(null, this.username, this.password);

		do {
			Method method = (challenge == null)
									? new AMQP.Connection.StartOk.Builder()
											  .clientProperties(_clientProperties)
											  .mechanism(sm.getName())
											  .response(response)
											  .build()
									: new AMQP.Connection.SecureOk.Builder().response(response).build();

			try {
                // Connection.Tune
				Method serverResponse = 
                    _channel0.rpc(method, handshakeTimeout/2).getMethod();
				if (serverResponse instanceof AMQP.Connection.Tune) {
					connTune = (AMQP.Connection.Tune) serverResponse;
				} else {
					challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
					response = sm.handleChallenge(challenge, this.username,this.password);
				}
			} catch (ShutdownSignalException e) {
				Method shutdownMethod = e.getReason();
				if (shutdownMethod instanceof AMQP.Connection.Close) {
					AMQP.Connection.Close shutdownClose = 
                        (AMQP.Connection.Close) shutdownMethod;
					if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
						throw new AuthenticationFailureException(shutdownClose.getReplyText());
					}
				}
				throw new PossibleAuthenticationFailureException(e);
			}
		} while (connTune == null);
	} catch (TimeoutException te) {
		_frameHandler.close();
		throw te;
	} catch (ShutdownSignalException sse) {
		_frameHandler.close();
		throw AMQChannel.wrap(sse);
	} catch(IOException ioe) {
		_frameHandler.close();
		throw ioe;
	}

	try {
        // channelMax
		int channelMax =
			negotiateChannelMax(this.requestedChannelMax,connTune.getChannelMax());
        // ChannelManager
		_channelManager = instantiateChannelManager(channelMax, threadFactory);
        // frameMax
		int frameMax =
			negotiatedMaxValue(this.requestedFrameMax,connTune.getFrameMax());
		this._frameMax = frameMax;
        
		int heartbeat =
			negotiatedMaxValue(this.requestedHeartbeat,connTune.getHeartbeat());
		
        // 启动心跳
        setHeartbeat(heartbeat);

		_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
							.channelMax(channelMax)
							.frameMax(frameMax)
							.heartbeat(heartbeat)
						  .build());
		_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
								  .virtualHost(_virtualHost)
								.build());
	} catch (IOException ioe) {
		_heartbeatSender.shutdown();
		_frameHandler.close();
		throw ioe;
	} catch (ShutdownSignalException sse) {
		_heartbeatSender.shutdown();
		_frameHandler.close();
		throw AMQChannel.wrap(sse);
	}

	// We can now respond to errors having finished tailoring the connection
	this._inConnectionNegotiation = false;
}

接下来在看下mainLoop线程中做了啥。

public void run() {
    try {
        while (_running) {
            Frame frame = _frameHandler.readFrame();
            // 处理帧
            readFrame(frame);
        }
    } catch (Throwable ex) {
        handleFailure(ex);
    } finally {
        doFinalShutdown();
    }
}

// readFrame(frame)内容
private void readFrame(Frame frame) throws IOException {
    if (frame != null) {
        _missedHeartbeats = 0;
        // 如果是心跳帧则忽略
        if (frame.type == AMQP.FRAME_HEARTBEAT) {
            // Ignore it: we've already just reset the heartbeat counter.
        } else {
            if (frame.channel == 0) { // the special channel
                // 特殊帧
                _channel0.handleFrame(frame);
            } else {
                if (isOpen()) {
                    // If we're still _running, but not isOpen(), then we
                    // must be quiescing, which means any inbound frames
                    // for non-zero channels (and any inbound commands on
                    // channel zero that aren't Connection.CloseOk) must
                    // be discarded.
                    ChannelManager cm = _channelManager;
                    if (cm != null) {
                        ChannelN channel;
                        try {
                            channel = cm.getChannel(frame.channel);
                        } catch(UnknownChannelException e) {
                            // this can happen if channel has been closed,
                            // but there was e.g. an in-flight delivery.
                            // just ignoring the frame to avoid closing the whole connection
                            LOGGER.info("Received a frame on an unknown channel, ignoring it");
                            return;
                        }
                        // 处理
                        channel.handleFrame(frame);
                    }
                }
            }
        }
    } else {
        // Socket timeout waiting for a frame.
        // Maybe missed heartbeat.
        handleSocketTimeout();
    }
}

// channel.handleFrame  后面在进行分析
public void handleFrame(Frame frame) throws IOException {
    AMQCommand command = _command;
    if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
        _command = new AMQCommand(); // prepare for the next one
        handleCompleteInboundCommand(command);
    }
}

总结:

connectionFactory.newConnection() - AutorecoveringConnection.init() - RecoveryAwareAMQConnectionFactory.newConnection() - RecoveryAwareAMQConnection.start()

步骤描述:

  • connectionFactory组装FrameHandlerFactoryMetricsCollectorConnectionParams

  • 构造AutorecoveringConnection实例这个实例中持有FrameHandlerFactory ConnectionParams(这个参数能构造RecoveryAwareAMQConnectionFactory实例cf

    metricsCollector以及连接地址信息。

  • 通过AutorecoveringConnection.init()来初始化A-conn(这里用A-表示是AutorecoveringConnection,下同)

    • 使用 A-conn持有的cf来创建 RecoveryAwareAMQConnection实例delegate
      • 获取每一帧数据的操作句柄FrameHandler
      • 将链接信息ConnectionParams 、操作句柄FrameHandler、 扩展组件metricsCollector传入RecoveryAwareAMQConnection的构造方法获取相应实例R-conn
      • R-conn.start()启动链接然后返回
    • 设置自动重连监听

后记:读起来还是比较费劲的,这个源码估计以后我还得在看2遍才能吃透一部分。目前就是先梳理一部分把,肯定会有遗漏。

 类似资料: