MQ!Rabbit-client command中有一个小尾巴没有处理,看下面的代码
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
_command = new AMQCommand(); // prepare for the next one
handleCompleteInboundCommand(command);
}
}
这段代码是rabbitmq处理帧的操作,当时分析了command.handleFrame(frame)
这个方法实际上就是调用CommandAssembler.handleFrame()
。这里我们来下面的方法handleCompleteInboundCommand(command);
// handle a command which has been assembled
// 处理已组装好的命令
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {
// First, offer the command to the asynchronous-command
// handling mechanism, which gets to act as a filter on the
// incoming command stream. If processAsync() returns true,
// the command has been dealt with by the filter and so should
// not be processed further. It will return true for
// asynchronous commands (deliveries/returns/other events),
// and false for commands that should be passed on to some
// waiting RPC continuation.
// 过滤掉已经处理的命令
if (!processAsync(command)) {
// The filter decided not to handle/consume the command,
// so it must be a response to an earlier RPC.
if (_checkRpcResponseType) {
synchronized (_channelMutex) {
// check if this reply command is intended for the current waiting request before calling nextOutstandingRpc()
if (_activeRpc != null && !_activeRpc.canHandleReply(command)) {
// this reply command is not intended for the current waiting request
// most likely a previous request timed out and this command is the reply for that.
// Throw this reply command away so we don't stop the current request from waiting for its reply
return;
}
}
}
final RpcWrapper nextOutstandingRpc = nextOutstandingRpc();
// the outstanding RPC can be null when calling Channel#asyncRpc
if(nextOutstandingRpc != null) {
nextOutstandingRpc.complete(command);
markRpcFinished();
}
}
}
敲黑板了!!!
processAsync
这个方法的实现是在ChnnelN
中(终于绕到ChannelN
上了 )。
另外这个方法也是AMQChannel
中唯一的抽象方法
ChannelN
是整个RabbitMQ客户端最核心的一个类了✨
ChannelN
的成员变量private final Map<String, Consumer> _consumers = Collections.synchronizedMap(new HashMap<String, Consumer>());
private volatile Consumer defaultConsumer = null;
private final ConsumerDispatcher dispatcher;
private final Collection<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
private final Collection<FlowListener> flowListeners = new CopyOnWriteArrayList<FlowListener>();
private volatile CountDownLatch finishedShutdownFlag = null;
private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
private long nextPublishSeqNo = 0L;
private final SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
private volatile boolean onlyAcksReceived = true;
原文注释都被我干掉了
@Override public boolean processAsync(Command command) throws IOException
{
Method method = command.getMethod();
// 方法帧 Channel.Close,异步关闭
if (method instanceof Channel.Close) {
asyncShutdown(command);
return true;
}
// 链接打开中
if (isOpen()) {
if (method instanceof Basic.Deliver) {
processDelivery(command, (Basic.Deliver) method);
return true;
} else if (method instanceof Basic.Return) {
callReturnListeners(command, (Basic.Return) method);
return true;
} else if (method instanceof Channel.Flow) {
Channel.Flow channelFlow = (Channel.Flow) method;
synchronized (_channelMutex) {
_blockContent = !channelFlow.getActive();
transmit(new Channel.FlowOk(!_blockContent));
_channelMutex.notifyAll();
}
return true;
} else if (method instanceof Basic.Ack) {
Basic.Ack ack = (Basic.Ack) method;
callConfirmListeners(command, ack);
handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
return true;
} else if (method instanceof Basic.Nack) {
Basic.Nack nack = (Basic.Nack) method;
callConfirmListeners(command, nack);
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
return true;
} else if (method instanceof Basic.RecoverOk) {
for (Map.Entry<String, Consumer> entry : Utility.copy(_consumers).entrySet()) {
this.dispatcher.handleRecoverOk(entry.getValue(), entry.getKey());
}
return false;
} else if (method instanceof Basic.Cancel) {
Basic.Cancel m = (Basic.Cancel)method;
String consumerTag = m.getConsumerTag();
Consumer callback = _consumers.remove(consumerTag);
if (callback == null) {
callback = defaultConsumer;
}
if (callback != null) {
try {
this.dispatcher.handleCancel(callback, consumerTag);
} catch (Throwable ex) {
getConnection().getExceptionHandler().
handleConsumerException(this, ex, callback,consumerTag,"handleCancel");
}
}
return true;
} else {
return false;
}
} else {
// Channel.CloseOk方法
if (method instanceof Channel.CloseOk) {
return false;
} else {
return true;
}
}
}
这个方法主要用来针对接受到broker
的AMQCommand
进行进一步的处理,至于怎么接受Socket,怎么封装成帧,怎么确定一个AMQComand已经封装完毕,都已在调用此方法前完成。此方法可以处理:Channel.Close
, Basic.Deliver
, Basic.Return
, Channel.Flow
, Basic.Ack
, Basic.Nack
,Basic.RecoverOk
, Basic.Cancel
, Channel.CloseOk
等。
那来吧开始打怪了濫。
// Basic.Deliver
processDelivery(command, (Basic.Deliver) method);
protected void processDelivery(Command command, Basic.Deliver method) {
Basic.Deliver m = method;
Consumer callback = _consumers.get(m.getConsumerTag());
if (callback == null) {
if (defaultConsumer == null) {
// No handler set. We should blow up as this message
// needs acking, just dropping it is not enough. See bug
// 22587 for discussion.
throw new IllegalStateException("Unsolicited delivery -" +
" see Channel.setDefaultConsumer to handle this" +
" case.");
} else {
callback = defaultConsumer;
}
}
Envelope envelope = new Envelope(m.getDeliveryTag(),
m.getRedelivered(),
m.getExchange(),
m.getRoutingKey());
try {
// call metricsCollector before the dispatching (which is async anyway)
// this way, the message is inside the stats before it is handled
// in case a manual ack in the callback, the stats will be able to record the ack
metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
// ConsumerWorkService实例
this.dispatcher.handleDelivery(callback,
m.getConsumerTag(),
envelope,
(BasicProperties) command.getContentHeader(),
command.getContentBody());
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConsumerException(this,
ex,
callback,
m.getConsumerTag(),
"handleDelivery");
}
}
在学习ConsumerWorkService
里面留了一个尾巴就是ConsumerDispatcher
。这里会进行补充吧。分析下上面代码里面的this.dispatcher.handleDelivery
public void handleDelivery(final Consumer delegate,
final String consumerTag,
final Envelope envelope,
final AMQP.BasicProperties properties,
final byte[] body) throws IOException {
executeUnlessShuttingDown(
new Runnable() {
@Override
public void run() {
try {
delegate.handleDelivery(consumerTag,
envelope,
properties,
body);
} catch (Throwable ex) {
connection.getExceptionHandler().handleConsumerException(
channel,
ex,
delegate,
consumerTag,
"handleDelivery");
}
}
});
}
不要看里面Runnable
的实现看外面套的那层方法:
private void executeUnlessShuttingDown(Runnable r) {
if (!this.shuttingDown) execute(r);
}
private void execute(Runnable r) {
checkShutdown();
this.workService.addWork(this.channel, r);
}
这里就可以看到最后其实还是将需要处理的任务封装为一个Runnable
然后调用了workService.addWork
。
所以说其实ConsumerDispatcher
其实就是把任务分发给workPool
处理。
什么?不知道workService.addWork
是干什么的! 嘿嘿,去帮我刷下浏览量吧 MQ!Rabbit-client ConsumerWorkService
★☆★☆★☆★☆★☆★☆ ⊙◎○●”·. 可爱的分割线 .·“●○◎⊙ ★☆★☆★☆★☆★☆★☆
上面已经跑偏了,这里主要是要学习ChannelN
的。来继续看下其他方法吧
@Override
public void basicQos(int prefetchSize, int prefetchCount, boolean global)
throws IOException{
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
}
// AMQImpl
public Qos(int prefetchSize, int prefetchCount, boolean global) {
this.prefetchSize = prefetchSize;
this.prefetchCount = prefetchCount;
this.global = global;
}
好吧,这个源码我是看不出来有什么了,只好找网上的说法了。
消费者在开启ACK的情况下,对接受到的消息可以根据业务的需要异步对消息进行确认。
然而在实际使用过程中,由于消费者自身处理能力有限,从RabbitMQ获取一定数量的消息后,希望rabbitmq不再将队列中的消息推送过来,当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接受来自队列的消息。在这种场景下,我们可以设置Basic.Qos中的prefetch_count来达到这个效果。
发送Basic.Consume
帧,然后等待Basic.ConsumeOk
帧。待收到broker端的Basic.ConsumeOk
帧之后,触发BlockingRpcContinuation
中的transformReply()
方法
当发送Basic.Consume帧之后,由broker返回的是Basic.ConsumeOk帧+Basic.Deliver帧,Basic.ConsumerOk帧由下面方法处理,Basic.Deliver帧由processAsync处理。
@Override
public String basicConsume(String queue, final boolean autoAck, String consumerTag,
boolean noLocal, boolean exclusive, Map<String, Object> arguments,
final Consumer callback)
throws IOException {
final Method m = new Basic.Consume.Builder()
.queue(queue)
.consumerTag(consumerTag)
.noLocal(noLocal)
.noAck(autoAck)
.exclusive(exclusive)
.arguments(arguments)
.build();
BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>(m) {
@Override
public String transformReply(AMQCommand replyCommand) {
String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag();
_consumers.put(actualConsumerTag, callback);
// need to register consumer in stats before it actually starts consuming
metricsCollector.basicConsume(ChannelN.this, actualConsumerTag, autoAck);
dispatcher.handleConsumeOk(callback, actualConsumerTag);
return actualConsumerTag;
}
};
rpc(m, k);
try {
if(_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply();
} else {
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
} catch(ShutdownSignalException ex) {
throw wrap(ex);
}
}
基本上就是客户端发送Basic.Get至Broker,Broker返回Basic.GetOK并携带数据。注意方法最后返回GetResponse对象,这个对象就是包装了一下数据。
public GetResponse basicGet(String queue, boolean autoAck)
throws IOException
{
AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder()
.queue(queue)
.noAck(autoAck)
.build());
Method method = replyCommand.getMethod();
if (method instanceof Basic.GetOk) {
Basic.GetOk getOk = (Basic.GetOk)method;
Envelope envelope = new Envelope(getOk.getDeliveryTag(),
getOk.getRedelivered(),
getOk.getExchange(),
getOk.getRoutingKey());
BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
byte[] body = replyCommand.getContentBody();
int messageCount = getOk.getMessageCount();
return new GetResponse(envelope, props, body, messageCount);
} else if (method instanceof Basic.GetEmpty) {
return null;
} else {
throw new UnexpectedMethodError(method);
}
}
/** Public API - {@inheritDoc} */
public Tx.SelectOk txSelect()
throws IOException
{
return (Tx.SelectOk) exnWrappingRpc(new Tx.Select()).getMethod();
}
/** Public API - {@inheritDoc} */
public Tx.CommitOk txCommit()
throws IOException
{
return (Tx.CommitOk) exnWrappingRpc(new Tx.Commit()).getMethod();
}
/** Public API - {@inheritDoc} */
public Tx.RollbackOk txRollback()
throws IOException
{
return (Tx.RollbackOk) exnWrappingRpc(new Tx.Rollback()).getMethod();
}
上面几个方法参考博客:
https://blog.csdn.net/u013256816/article/details/70214863