本篇文章主要讲解Client(客户端)如何连接到DDPush,并向DDPush发送消息(主要是心跳包和确认信息),和如何接收APPServer推送给DDPush的消息,本篇文章分析官方推荐的UDP工作模式。
UDP模式主要涉及到以下几个重要的类:
1、UdpConnector 监听端口的UDP数据包
2、Receiver 接受终端消息
3、Sender 向终端发送消息
4、Messenger 从Receiver的消息队列中取出消息, 再从内存中查找对应的状态机以及需要发送给终端的消息, 最后加入到Sender的消息队列并由Sender发出去
一、UdpConnector
关键方法start()
public void start() throws Exception{
if(antenna != null){
throw new Exception("antenna is not null, may have run before");
}
antenna = DatagramChannel.open();
antenna.socket().bind(new InetSocketAddress(port));
System.out.println("udp connector port:"+port);
//non-blocking
antenna.configureBlocking(false);
antenna.socket().setReceiveBufferSize(1024*1024*PropertyUtil.getPropertyInt("CLIENT_UDP_BUFFER_RECEIVE"));
antenna.socket().setSendBufferSize(1024*1024*PropertyUtil.getPropertyInt("CLIENT_UDP_BUFFER_SEND"));
System.out.println("udp connector recv buffer size:"+antenna.socket().getReceiveBufferSize());
System.out.println("udp connector send buffer size:"+antenna.socket().getSendBufferSize());
this.receiver = new Receiver(antenna);
this.receiver.init();
this.sender = new Sender(antenna);
this.sender.init();
this.senderThread = new Thread(sender,"AsynUdpConnector-sender");
this.receiverThread = new Thread(receiver,"AsynUdpConnector-receiver");
this.receiverThread.start();
this.senderThread.start();
}
第6行:绑定一个端口端口,监听此端口的数据包
第16行:receiver是一个runnable对象,用来接受Client发来的消息
第19行:sender是一个runnable对象,用来向Client推送消息
第21-24行:分别启动receiver和sender
二、Receiver
第一步:run()方法
public void run(){
while(!this.stoped){
try{
//synchronized(enQueSignal){
processMessage();
// if(mq.isEmpty() == true){
// enQueSignal.wait();
// }
//}
}catch(Exception e){
e.printStackTrace();
}catch(Throwable t){
t.printStackTrace();
}
}
}
这是一个while循环,用来不断的接收消息,主要看第5行的processMessage()方法
protected void processMessage() throws Exception{
address = null;
buffer.clear();
try{
address = this.channel.receive(buffer);
}catch(SocketTimeoutException timeout){
}
if(address == null){
try{
Thread.sleep(1);
}catch(Exception e){
}
return;
}
buffer.flip();
byte[] swap = new byte[buffer.limit() - buffer.position()];
System.arraycopy(buffer.array(), buffer.position(), swap, 0, swap.length);
ClientMessage m = new ClientMessage(address,swap);
enqueue(m);
//System.out.println(DateTimeUtil.getCurDateTime()+" r:"+StringUtil.convert(m.getData())+" from:"+m.getSocketAddress().toString());
}
第5行:接收数据填充到buffer,因为DatagramChannel设置成非阻塞了,所以此方法不管有无数据都会立即返回,所以第9行会做一个判断
第19-20:将buffer缓冲区的数据拷贝到swap数据
第22行:将byte[]数组和Client的address信息封装成ClientMessage对象
第24行:将ClientMessage对象加入到消息队列,等待处理(Messenger会从此队列取消息)
三、Sender
第一步:run()方法
public void run(){
while(!this.stoped){
try{
synchronized(enQueSignal){
while(mq.isEmpty() == true && stoped == false){
try{
enQueSignal.wait(1);
}catch(InterruptedException e){
}
//System.out.println("sender wake up");
}
processMessage();
}
}catch(Exception e){
e.printStackTrace();
}catch(Throwable t){
t.printStackTrace();
}
}
}
这是一个while循环,用来不断的发送消息,主要看第13行的processMessage()方法
第二步:processMessage()方法
protected void processMessage() throws Exception{
buffer.clear();
ServerMessage pendingMessage = dequeue();
if(pendingMessage == null){
//Thread.yield();
return;
}
buffer.put(pendingMessage.getData());
buffer.flip();
channel.send(buffer, pendingMessage.getSocketAddress());
//System.out.println(DateTimeUtil.getCurDateTime()+" s:"+StringUtil.convert(pendingMessage.getData())+" to :"+pendingMessage.getSocketAddress().toString());
}
第3行:从消息发送队列取出一条消息,ServerMessage对象主要包括消息内容和Client地址信息两个属性
第8行:将消息内容放到缓冲区
第10行:将消息内容发送给Client
那么问题来了,Receiver中加入队列的ClientMessage怎么转为ServerMessage并加入到Sender的消息发送队列的呢?这就涉及另外一个重要的角色Messenger
四、Messenger
这也是一个实现Runnable接口的对象,所以首先找到run()方法
第一步:run()方法
@Override
public void run() {
this.started = true;
while(stoped == false){
try{
procMessage();
}catch(Exception e){
e.printStackTrace();
}catch(Throwable t){
t.printStackTrace();
}
}
}
这是一个while循环,用来不断的发送消息,主要看第7行的procMessage()方法
第二步:procMessage()方法
private void procMessage() throws Exception{
ClientMessage m = this.obtainMessage();
if(m == null){
try{
Thread.sleep(5);
}catch(Exception e){
;
}
return;
}
// 对终端发布消息
this.deliverMessage(m);
}
第2行:从队列取出一条Client发过来的消息,ClientMessage是不是很眼熟?我们看下this.obtainMessage()这个方法
private ClientMessage obtainMessage() throws Exception{什么?它又调用了receiver的receiver()方法,好吧,绕了半天,原来最终是从receiver的消息队列中取得消息的。
第12行:处理Client发来的消息,那么我们具体看下deliverMessage(m)这个方法
第三步、deliverMessage(ClientMessage m)方法
private void deliverMessage(ClientMessage m) throws Exception{
//System.out.println(this.hostThread.getName()+" receive:"+StringUtil.convert(m.getData()));
//System.out.println(m.getSocketAddress().getClass().getName());
String uuid = m.getUuidHexString();
//ClientStatMachine csm = NodeStatus.getInstance().getClientStat(uuid);
ClientStatMachine csm = nodeStat.getClientStat(uuid); // 查找内存中的状态机
if(csm == null){//
csm = ClientStatMachine.newByClientTick(m); // 创建状态机
if(csm == null){
return;
}
nodeStat.putClientStat(uuid, csm);
}
// 查找是否有消息发送给终端
ArrayList<ServerMessage> smList = csm.onClientMessage(m);
if(smList == null){
return;
}
for(int i = 0; i < smList.size(); i++){
ServerMessage sm = smList.get(i);
if(sm.getSocketAddress() == null)continue;
this.connector.send(sm);
}
}
第4行:是一个标准的UUID,用来唯一标识每一个用户
第6行:根据UUID去内存中查找有没有状态机csm,可以理解为一个在线用户对象
第7-13行:内存中没有该用户则创建一个并保存
第15行:根据Client发送过来的消息,判断需不需要回复、是否有离线的消息需要发给此用户,消息封装为ServerMessage并加入到集合,因为可能会有多条消息需要下发
第22行:将ServerMessage推送给客户端,看到这里,我们不禁有疑问,Sender不是用来给Client推送消息的吗?我们看下this.connector.send(sm);这行代码执行了什么,它是调用UdpConnector对象的send(ServerMessage message)方法,让我们看下这个方法
public boolean send(ServerMessage message) throws Exception {
return sender.send(message);
}
它又调用了Sender对象的send(ServerMessage message)方法,继续跟踪下去
public boolean send(ServerMessage message){
return enqueue(message);
}
看到这里,我们应该能明白了,这个方法的作用是将需要推送的ServerMessage对象加入到Sender的消息队列,Sender的run()方法会从队列去取消息再推送给客户端。
到此,我们应该明白了UDP模式Client与DDPush的交互流程。好了,本篇文章的讲解也就到此结束了,如有问题,欢迎大家指正!