当前位置: 首页 > 工具软件 > DDPush > 使用案例 >

DDPush开源推送框架源码分析之Client到DDPush(UDP模式)

韩靖琪
2023-12-01
在前一篇文章中我们主要分析了AppServer是如何连接到DDPush,并向DDPush推送消息,还没有看过的朋友请移步 DDPush开源推送框架源码分析之APPServer到DDPush


本篇文章主要讲解Client(客户端)如何连接到DDPush,并向DDPush发送消息(主要是心跳包和确认信息),和如何接收APPServer推送给DDPush的消息,本篇文章分析官方推荐的UDP工作模式。


UDP模式主要涉及到以下几个重要的类:

1、UdpConnector 监听端口的UDP数据包

2、Receiver 接受终端消息

3、Sender 向终端发送消息

4、Messenger 从Receiver的消息队列中取出消息, 再从内存中查找对应的状态机以及需要发送给终端的消息, 最后加入到Sender的消息队列并由Sender发出去


一、UdpConnector

关键方法start()

public void start() throws Exception{
		if(antenna != null){
			throw new Exception("antenna is not null, may have run before");
		}
		antenna = DatagramChannel.open();
		antenna.socket().bind(new InetSocketAddress(port));
		System.out.println("udp connector port:"+port);
		//non-blocking
		antenna.configureBlocking(false);
		antenna.socket().setReceiveBufferSize(1024*1024*PropertyUtil.getPropertyInt("CLIENT_UDP_BUFFER_RECEIVE"));
		antenna.socket().setSendBufferSize(1024*1024*PropertyUtil.getPropertyInt("CLIENT_UDP_BUFFER_SEND"));
		System.out.println("udp connector recv buffer size:"+antenna.socket().getReceiveBufferSize());
		System.out.println("udp connector send buffer size:"+antenna.socket().getSendBufferSize());
		
		
		this.receiver = new Receiver(antenna);
		this.receiver.init();
		this.sender = new Sender(antenna);
		this.sender.init();
		
		this.senderThread = new Thread(sender,"AsynUdpConnector-sender");
		this.receiverThread = new Thread(receiver,"AsynUdpConnector-receiver");
		this.receiverThread.start();
		this.senderThread.start();
	}
第6行:绑定一个端口端口,监听此端口的数据包

第16行:receiver是一个runnable对象,用来接受Client发来的消息

第19行:sender是一个runnable对象,用来向Client推送消息

第21-24行:分别启动receiver和sender


二、Receiver

第一步:run()方法

public void run(){
		while(!this.stoped){
			try{
				//synchronized(enQueSignal){
					processMessage();
				//	if(mq.isEmpty() == true){
				//		enQueSignal.wait();
				//	}
				//}
			}catch(Exception e){
				e.printStackTrace();
			}catch(Throwable t){
				t.printStackTrace();
			}
		}
	}
这是一个while循环,用来不断的接收消息,主要看第5行的processMessage()方法

第二步:processMessage()方法

protected void processMessage() throws Exception{
		address = null;
		buffer.clear();
		try{
			address = this.channel.receive(buffer);
		}catch(SocketTimeoutException timeout){
			
		}
		if(address == null){
			try{
				Thread.sleep(1);
			}catch(Exception e){
				
			}
			return;
		}
		
		buffer.flip();
		byte[] swap = new byte[buffer.limit() - buffer.position()];
		System.arraycopy(buffer.array(), buffer.position(), swap, 0, swap.length);

		ClientMessage m = new ClientMessage(address,swap);
		
		enqueue(m);
		//System.out.println(DateTimeUtil.getCurDateTime()+" r:"+StringUtil.convert(m.getData())+" from:"+m.getSocketAddress().toString());

	}
第5行:接收数据填充到buffer,因为DatagramChannel设置成非阻塞了,所以此方法不管有无数据都会立即返回,所以第9行会做一个判断

第19-20:将buffer缓冲区的数据拷贝到swap数据

第22行:将byte[]数组和Client的address信息封装成ClientMessage对象

第24行:将ClientMessage对象加入到消息队列,等待处理(Messenger会从此队列取消息)


三、Sender

第一步:run()方法

public void run(){
		while(!this.stoped){
			try{
				synchronized(enQueSignal){
					while(mq.isEmpty() == true && stoped == false){
						try{
							enQueSignal.wait(1);
						}catch(InterruptedException e){
							
						}
						//System.out.println("sender wake up");
					}
					processMessage();
					
				}
			}catch(Exception e){
				e.printStackTrace();
			}catch(Throwable t){
				t.printStackTrace();
			}
		}
	}
这是一个while循环,用来不断的发送消息,主要看第13行的processMessage()方法

第二步:processMessage()方法

protected void processMessage() throws Exception{
		buffer.clear();
		ServerMessage pendingMessage = dequeue();
		if(pendingMessage == null){
			//Thread.yield();
			return;
		}
		buffer.put(pendingMessage.getData());
		buffer.flip();
		channel.send(buffer, pendingMessage.getSocketAddress());
		//System.out.println(DateTimeUtil.getCurDateTime()+" s:"+StringUtil.convert(pendingMessage.getData())+" to  :"+pendingMessage.getSocketAddress().toString());
	}
第3行:从消息发送队列取出一条消息,ServerMessage对象主要包括消息内容和Client地址信息两个属性

第8行:将消息内容放到缓冲区

第10行:将消息内容发送给Client
那么问题来了,Receiver中加入队列的ClientMessage怎么转为ServerMessage并加入到Sender的消息发送队列的呢?这就涉及另外一个重要的角色Messenger


四、Messenger

这也是一个实现Runnable接口的对象,所以首先找到run()方法

第一步:run()方法

@Override
	public void run() {
		this.started = true;
		
		while(stoped == false){
			try{
				procMessage();
			}catch(Exception e){
				e.printStackTrace();
			}catch(Throwable t){
				t.printStackTrace();
			}
		}

	}
这是一个while循环,用来不断的发送消息,主要看第7行的procMessage()方法


第二步:procMessage()方法

private void procMessage() throws Exception{
		ClientMessage m = this.obtainMessage();
		if(m == null){
			try{
				Thread.sleep(5);
			}catch(Exception e){
				;
			}
			return;
		}
		// 对终端发布消息
		this.deliverMessage(m);
		
	}
第2行:从队列取出一条Client发过来的消息,ClientMessage是不是很眼熟?我们看下this.obtainMessage()这个方法

private ClientMessage obtainMessage() throws Exception{
  return connector.receive();
 }
它又调用了connector的receiver()方法,connector就是UdpConnector对象,那么我们看下UdpConnector的receiver()方法
public ClientMessage receive() throws Exception {
  return receiver.receive();
 }

什么?它又调用了receiver的receiver()方法,好吧,绕了半天,原来最终是从receiver的消息队列中取得消息的。

第12行:处理Client发来的消息,那么我们具体看下deliverMessage(m)这个方法


第三步、deliverMessage(ClientMessage m)方法

private void deliverMessage(ClientMessage m) throws Exception{
		//System.out.println(this.hostThread.getName()+" receive:"+StringUtil.convert(m.getData()));
		//System.out.println(m.getSocketAddress().getClass().getName());
		String uuid = m.getUuidHexString();
		//ClientStatMachine csm = NodeStatus.getInstance().getClientStat(uuid);
		ClientStatMachine csm = nodeStat.getClientStat(uuid); // 查找内存中的状态机
		if(csm == null){//
			csm = ClientStatMachine.newByClientTick(m); // 创建状态机
			if(csm == null){
				return;
			}
			nodeStat.putClientStat(uuid, csm);
		}
		// 查找是否有消息发送给终端
		ArrayList<ServerMessage> smList = csm.onClientMessage(m);
		if(smList == null){
			return;
		}
		for(int i = 0; i < smList.size(); i++){
			ServerMessage sm = smList.get(i);
			if(sm.getSocketAddress() == null)continue;
			this.connector.send(sm);
		}
		
	}
第4行:是一个标准的UUID,用来唯一标识每一个用户

第6行:根据UUID去内存中查找有没有状态机csm,可以理解为一个在线用户对象

第7-13行:内存中没有该用户则创建一个并保存

第15行:根据Client发送过来的消息,判断需不需要回复、是否有离线的消息需要发给此用户,消息封装为ServerMessage并加入到集合,因为可能会有多条消息需要下发

第22行:将ServerMessage推送给客户端,看到这里,我们不禁有疑问,Sender不是用来给Client推送消息的吗?我们看下this.connector.send(sm);这行代码执行了什么,它是调用UdpConnector对象的send(ServerMessage message)方法,让我们看下这个方法

public boolean send(ServerMessage message) throws Exception {
		return sender.send(message);
		
	}
它又调用了Sender对象的send(ServerMessage message)方法,继续跟踪下去

public boolean send(ServerMessage message){
		return enqueue(message);
	}
看到这里,我们应该能明白了,这个方法的作用是将需要推送的ServerMessage对象加入到Sender的消息队列,Sender的run()方法会从队列去取消息再推送给客户端。


到此,我们应该明白了UDP模式Client与DDPush的交互流程。好了,本篇文章的讲解也就到此结束了,如有问题,欢迎大家指正!









 类似资料: