MqServer.main(args);
。server = new MqServer(configFile);
。server创建的时候,会进行MqServerAdaptor
的初始化,主要初始化了SubscriptionManager
,MessageDispatcher
,UrlRouteFilter
,加载了一系列的Handler。MqServer#start
。NettyAdaptor实现了ChannelInboundHandlerAdapter
,当有消息过来的时候,会执行ioAdaptor.onMessage(msg, sess);
;根据服务端的接收到的消息里面的cmd属性,获取对应的handler处理消息。
CreateHandler#handle
// MqManager#saveQueue()
MessageQueue mq = mqTable.get(mqName);
if(mq == null) {
if(Protocol.MEMORY.equals(mqType)) {
mq = new MemoryQueue(mqName, creator);
} else if (Protocol.DISK.equals(mqType)) {
mq = new DiskQueue(mqName, new File(mqDir), creator);
} else {
throw new IllegalArgumentException("mqType(" + mqType + ") Not Support");
}
mqTable.put(mqName, mq);
}
PubHandler#handle
mq.write(req)
; public void write(Object... data) {
synchronized (array) {
for (Object obj : data) {
int i = (int) (end % maxSize);
array[i] = obj;
forwardIndex();
}
}
}
public void write(DiskMessage... data) throws IOException{
if(data.length == 0) return;
writeLock.lock();
try{
int count = writeBlock.write(data);
if(count <= 0){
writeBlock.close();
writeBlock = index.createWriteBlock();
writeBlock.write(data);
}
}
finally {
writeLock.unlock();
}
}
消息分发器处理该队列, messageDispatcher.dispatch(mq);
获取mq的chanal列表;管道中读取队列中消息,message = mq.read(channel);写回客户端。
SubHandler#handle
,核心就是messageDispatcher.dispatch(mq, channelName);
。该分发器指定channel。
服务端是用的webSocket连接,给服务端发送消息。
执行Client#invoke()
发送Msg
Message sub = new Message();
sub.setHeader("cmd", "sub"); //Subscribe on MQ/Channel
sub.setHeader("mq", mq);
sub.setHeader("channel", channel);
sub.setHeader("window", 1);
//sub.setHeader("filter", "abc");
client.invoke(sub, data->{
System.out.println(data);
});
WebsocketClient#sendMessage
public void sendMessage(String command){
synchronized (connectLock) {
//如果chanel为null,尝试连接服务端
if(this.channel == null){
this.cachedSendingMessages.add(command);
this.connect();
return;
}
ByteBuf buf = Unpooled.wrappedBuffer(command.getBytes());
WebSocketFrame frame = new TextWebSocketFrame(buf);
//将消息数据发送给服务端
this.channel.writeAndFlush(frame);
}
}
Message msg = new Message();
msg.setHeader("cmd", "pub"); //Publish
msg.setHeader("mq", mq);
msg.setBody(i);
client.invoke(msg, res->{
if(count.getAndIncrement() % 10000 == 0) {
System.out.println(res);
}
});
public void write(Object... data) {
synchronized (array) {
for (Object obj : data) {
int i = (int) (end % maxSize);
array[i] = obj;
forwardIndex();
}
}
}
MqServer server = new MqServer(new MqServerConfig());
MqClient client = new MqClient(server);
client.heartbeat(30, TimeUnit.SECONDS);
final String mq = "MyMQ", channel = "MyChannel";
AtomicInteger count = new AtomicInteger(0);
client.addMqHandler(mq, channel, data->{
if(count.getAndIncrement() % 10000 == 0) {
System.out.println(data);
}
});
//MqClient#handleMessage
private void handleMessage(Message response) throws Exception {
boolean handled = handleInvokeResponse(response);
if(handled) return;
//Subscribed message pushing
String mq = (String)response.getHeader(Protocol.MQ);
String channel = (String)response.getHeader(Protocol.CHANNEL);
if(mq == null || channel == null) {
logger.warn("MQ/Channel both required in reponse: " + JsonKit.toJSONString(response));
return;
}
MqHandler mqHandler = getHandler(mq, channel);
if(mqHandler == null) {
logger.warn(String.format("Missing handler for mq=%s,channel=%s",mq, channel));
return;
}
try {
mqHandler.handler.handle(response);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
//increase window if required
Integer window = response.getHeaderInt(Protocol.WINDOW);
if(window != null) {
if(window <= mqHandler.window/2) {
try {
Message sub = new Message();
sub.setHeader(Protocol.CMD, Protocol.SUB);
sub.setHeader(Protocol.MQ, mq);
sub.setHeader(Protocol.CHANNEL, channel);
sub.setHeader(Protocol.WINDOW, mqHandler.window);
sub.setHeader(Protocol.ACK, false);
this.sendMessage(sub);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
}
@Override
public Message read() throws IOException {
synchronized (queue.array) {
if (channel.offset < queue.start) {
channel.offset = queue.start;
}
Message res = null;
if (channel.offset < queue.end) {
int idx = (int) (channel.offset % queue.maxSize);
res = new Message((Message)queue.array[idx]);
res.setHeader(Protocol.OFFSET, channel.offset); //Add offset
channel.offset++;
}
return res;
}
}
CircularArray
,当队列长度设置为10,而生产者发送了超过10条数据,end其实是比10大的。存放在消息的int i = (int) (end % maxSize);
。start不变,end是随着消息的增长保持递增。当end>maxSize,开始增长start。start表示队列消费的开始,任何的channel都从start开始消费,意味着如果随着消息的增多,并且消息的数量超过了队列的长度,就开始移除最开始的消息。 private int forwardIndex() {
if (end - start >= maxSize) {
start++;
}
end++;
return (int) (end % maxSize);
}
int idx = (int) (channel.offset % queue.maxSize);
。 @Override
public Message read() throws IOException {
synchronized (queue.array) {
//所有的channel都是从start开始消费
if (channel.offset < queue.start) {
channel.offset = queue.start;
}
Message res = null;
if (channel.offset < queue.end) {
int idx = (int) (channel.offset % queue.maxSize);
res = new Message((Message)queue.array[idx]);
res.setHeader(Protocol.OFFSET, channel.offset); //Add offset
channel.offset++;
}
return res;
}
}