当前位置: 首页 > 工具软件 > zbus > 使用案例 >

【ZBus系列】分析ZBus源码,思考消息服务框架应该如何设计

晏永康
2023-12-01

【ZBus系列】分析ZBus源码,思考消息服务框架应该如何设计

消息队列

规则设计

  1. 队列消息数据单份,分组通道任意多个
  2. 分组通道之内的消费者共享分组读指针,读指针改变影响本分组通道上的所有消费者
  3. 分组通道之间的消费者互不影响
  4. 分组通道的支持消息过滤,基于消息头部Tag

消息模式

  1. 单播 仅一个分组通道,所有的消费者共享一个分组通道,每条消息只送达其中一个消费者
  2. 广播 每个消费一个分组通道,每条消息抵达所有的消费者
  3. 组播 单播与广播的混合,多个分组通道,每个分组通道上多个消费者负载均衡
  4. 订阅模式 广播的分组上设置消息过滤主题(注意区别队列主题Topic)

服务端启动流程

  1. 入口,MqServer.main(args);
  2. 加载zbus.xml配置文件。
  3. 根据配置文件创建server,server = new MqServer(configFile); 。server创建的时候,会进行MqServerAdaptor的初始化,主要初始化了SubscriptionManager,MessageDispatcher,UrlRouteFilter,加载了一系列的Handler。
  4. MqServer启动,MqServer#start

服务端的消息处理

NettyAdaptor实现了ChannelInboundHandlerAdapter,当有消息过来的时候,会执行ioAdaptor.onMessage(msg, sess);;根据服务端的接收到的消息里面的cmd属性,获取对应的handler处理消息。

各类型的消息处理

创建队列

  1. CreateHandler#handle
// MqManager#saveQueue()
MessageQueue mq = mqTable.get(mqName); 
		if(mq == null) {
			if(Protocol.MEMORY.equals(mqType)) {
				mq = new MemoryQueue(mqName, creator);
			} else if (Protocol.DISK.equals(mqType)) {
				mq = new DiskQueue(mqName, new File(mqDir), creator);
			} else {
				throw new IllegalArgumentException("mqType(" + mqType + ") Not Support");
			}  
			mqTable.put(mqName, mq);
		}

发布消息

PubHandler#handle

  1. 消息写入服务端,mq.write(req);
  • mq类型是内存,写入数组
	public void write(Object... data) {
		synchronized (array) {
			for (Object obj : data) {
				int i = (int) (end % maxSize);
				array[i] = obj;
				forwardIndex();
			}
		}
	}  
  • mq类型是磁盘
	public void write(DiskMessage... data) throws IOException{
		if(data.length == 0) return;
		
		writeLock.lock();
		try{ 
			int count = writeBlock.write(data);
			if(count <= 0){
				writeBlock.close();
				writeBlock = index.createWriteBlock(); 
				writeBlock.write(data);
			}
		}
		finally {
			writeLock.unlock();
		}
	} 
  1. 消息分发器处理该队列, messageDispatcher.dispatch(mq);

    获取mq的chanal列表;管道中读取队列中消息,message = mq.read(channel);写回客户端。

订购消息

SubHandler#handle,核心就是messageDispatcher.dispatch(mq, channelName); 。该分发器指定channel。

客户端发送消息

服务端是用的webSocket连接,给服务端发送消息。

执行Client#invoke()发送Msg

			Message sub = new Message();
			sub.setHeader("cmd", "sub"); //Subscribe on MQ/Channel
			sub.setHeader("mq", mq); 
			sub.setHeader("channel", channel);
			sub.setHeader("window", 1);
			//sub.setHeader("filter", "abc");
			
			client.invoke(sub, data->{
				System.out.println(data);
			});

WebsocketClient#sendMessage

	public void sendMessage(String command){
		synchronized (connectLock) {
            //如果chanel为null,尝试连接服务端
			if(this.channel == null){
				this.cachedSendingMessages.add(command);
				this.connect();
				return;
			} 
			ByteBuf buf = Unpooled.wrappedBuffer(command.getBytes());
			WebSocketFrame frame = new TextWebSocketFrame(buf);
            //将消息数据发送给服务端
			this.channel.writeAndFlush(frame);
		}  
	} 

收发消息

生产者发送消息

			Message msg = new Message();
			msg.setHeader("cmd", "pub"); //Publish
			msg.setHeader("mq", mq);
			msg.setBody(i);  
			
			client.invoke(msg, res->{
				if(count.getAndIncrement() % 10000 == 0) {
					System.out.println(res); 
				}
			});

服务端存储消息

	public void write(Object... data) {
		synchronized (array) {
			for (Object obj : data) {
				int i = (int) (end % maxSize);
				array[i] = obj;
				forwardIndex();
			}
		}
	} 

客户端消费消息

  1. 如果是消费者,消费完一条消息后,继续发送cmd=sub的Message
		MqServer server = new MqServer(new MqServerConfig());
		MqClient client = new MqClient(server);    
		client.heartbeat(30, TimeUnit.SECONDS);
		
		final String mq = "MyMQ", channel = "MyChannel";
		AtomicInteger count = new AtomicInteger(0);  
		client.addMqHandler(mq, channel, data->{
			if(count.getAndIncrement() % 10000 == 0) {
				System.out.println(data); 
			} 
		});  
		//MqClient#handleMessage	
private void handleMessage(Message response) throws Exception {
		boolean handled = handleInvokeResponse(response);
		if(handled) return; 
		
		//Subscribed message pushing 
		String mq = (String)response.getHeader(Protocol.MQ);
		String channel = (String)response.getHeader(Protocol.CHANNEL);
		if(mq == null || channel == null) {
			logger.warn("MQ/Channel both required in reponse: " + JsonKit.toJSONString(response));
			return;
		} 
		MqHandler mqHandler = getHandler(mq, channel); 
		if(mqHandler == null) {
			logger.warn(String.format("Missing handler for mq=%s,channel=%s",mq, channel));
			return;
		} 
		try {
			mqHandler.handler.handle(response); 
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
		} 
		//increase window if required
		Integer window = response.getHeaderInt(Protocol.WINDOW);
		if(window != null) { 
			if(window <= mqHandler.window/2) {
				try {
					Message sub = new Message();
					sub.setHeader(Protocol.CMD, Protocol.SUB);
					sub.setHeader(Protocol.MQ, mq);
					sub.setHeader(Protocol.CHANNEL, channel);
					sub.setHeader(Protocol.WINDOW, mqHandler.window);
					sub.setHeader(Protocol.ACK, false);
					
					this.sendMessage(sub); 
				} catch (Exception e) {
					logger.error(e.getMessage(), e);
				}
			}
		} 
	}

服务端返回消息

	@Override
	public Message read() throws IOException {
		synchronized (queue.array) { 
			if (channel.offset < queue.start) {
				channel.offset = queue.start;
			} 
			Message res = null;
			if (channel.offset < queue.end) {
				int idx = (int) (channel.offset % queue.maxSize);
				res = new Message((Message)queue.array[idx]);
				res.setHeader(Protocol.OFFSET, channel.offset); //Add offset
				channel.offset++; 
			}
			return res;
		} 
	}

思考总结

  1. 因为队列用的是CircularArray,当队列长度设置为10,而生产者发送了超过10条数据,end其实是比10大的。存放在消息的int i = (int) (end % maxSize);。start不变,end是随着消息的增长保持递增。当end>maxSize,开始增长start。start表示队列消费的开始,任何的channel都从start开始消费,意味着如果随着消息的增多,并且消息的数量超过了队列的长度,就开始移除最开始的消息。
	private int forwardIndex() {
		if (end - start >= maxSize) {
			start++;
		}
		end++;
		return (int) (end % maxSize);
	}
  1. channel是根据offset获取到对应的消息的,但是必须从start开始消费。获取消息的索引,int idx = (int) (channel.offset % queue.maxSize);
	@Override
	public Message read() throws IOException {
		synchronized (queue.array) { 
            //所有的channel都是从start开始消费
			if (channel.offset < queue.start) {
				channel.offset = queue.start;
			} 
			Message res = null;
			if (channel.offset < queue.end) {
				int idx = (int) (channel.offset % queue.maxSize);
				res = new Message((Message)queue.array[idx]);
				res.setHeader(Protocol.OFFSET, channel.offset); //Add offset
				channel.offset++; 
			}
			return res;
		} 
	}
 类似资料: