今天。我们来学习 ddpush 中的一个UDP服务器的数据发送类。 ddpush 中 给我们提供了两种和服务器的连接方式。一种是UDP 一种是TCP 在ddpush的官方网站上,ddpush的开发者推荐大家使用UDP这种方式来做推送。理由 就是 UDP的带机量 要比 TCP 要多很多。官方说法是 几十上百倍。咱们也不去追究这个细节数字问题。应为这没有必要去纠结。我们只需要知道 UDP 比 TCP 的承载带计量 要高很多 就可以了。
应为一些技术方面的原因。我最近先给大家 把ddpush 的UDP服务器详细的讲解一遍。讲解完成之后。我们再来讲解一下ddpush的服务器启动、控制 相关的类。这些完成之后。我们就来实际的使用 运用ddpush来做推送。当然后续的还有一些其他文章。
好了。不多说了。我们先来看看这个Sender.java这个类。这个类在ddpush的 org.ddpush.im.v1.node.udpconnector 这个包下。 好了我们现在来上代码。这些代码基本上都被我添加了注释。大家好好的看。然后去理解。
//UDP 数据发送类
public class Sender implements Runnable{
//UDP 通道。
protected DatagramChannel channel;
//数据包收包计数器
protected AtomicLong queueIn = new AtomicLong(0);
//数据包 发包计数器
protected AtomicLong queueOut = new AtomicLong(0);
//定义 UDP 发射的数据包的最大缓冲区大小
protected int bufferSize = Constant.PUSH_MSG_HEADER_LEN+PropertyUtil.getPropertyInt("PUSH_MSG_MAX_CONTENT_LEN");
//服务器是否被暂停标志
protected boolean stoped = false;
// byte 缓冲
protected ByteBuffer buffer;
//这个就暂且看做是一个线程锁对象吧
protected Object enQueSignal = new Object();
//并发线程安全队列。这里主要用来存放需要下发的消息
protected ConcurrentLinkedQueue<ServerMessage> mq = new ConcurrentLinkedQueue<ServerMessage>();
//构造
public Sender(DatagramChannel channel){
this.channel = channel;
}
//初始化、 给byte缓冲区分配一块存储区域、大小是UDP发射的数据包的最大byte个数
public void init(){
buffer = ByteBuffer.allocate(bufferSize);
}
//停止
public void stop(){
this.stoped = true;
}
//UDP 数据发射器的线程体
public void run(){
//这个发射器在服务器启动时 就开始一直运行。直到服务器停止
while(!this.stoped){
try{
synchronized(enQueSignal){
//如果没有消息 并且 服务器没有关闭。就一直等待
while(mq.isEmpty() == true && stoped == false){
try{
enQueSignal.wait(1);
}catch(InterruptedException e){
}
//System.out.println("sender wake up");
}
//如果 有消息了 或者 服务器被关闭了。就开始处理UDP数据
//关于 服务器被关闭了 还处理消息这个可以不用纠结
//应为服务器关闭了。就必去去跳出内部的线程锁 执行外部循环
//外部循环才是判断服务器是否被关闭。所以得走到这里。
processMessage();
}
}catch(Exception e){
e.printStackTrace();
}catch(Throwable t){
t.printStackTrace();
}
}
}
//处理要发送的UDP数据
protected void processMessage() throws Exception{
//清空缓冲区。
buffer.clear();
//从并发线程安全的消息队列中取出一个被封装的待发射的UDP消息
ServerMessage pendingMessage = dequeue();
//如果消息为空。就返回
if(pendingMessage == null){
//Thread.yield();
return;
}
//将待发射的UDP消息写入缓冲区
buffer.put(pendingMessage.getData());
buffer.flip();
//发射这个UDP数据包
channel.send(buffer, pendingMessage.getSocketAddress());
//System.out.println(DateTimeUtil.getCurDateTime()+" s:"+StringUtil.convert(pendingMessage.getData())+" to :"+pendingMessage.getSocketAddress().toString());
}
//入队 将一个待发射的UDP数据包放入消息队列 并更新入队的消息个数
protected boolean enqueue(ServerMessage message){
boolean result = mq.add(message);
if(result == true){
queueIn.addAndGet(1);
}
return result;
}
//出队 取出一个待发射的UDP数据包 并更新 取出的消息个数
protected ServerMessage dequeue(){
ServerMessage m = mq.poll();
if(m != null){
queueOut.addAndGet(1);
}
return m;
}
//发射一个UDP数据包、这个主要用来在外部调用、外部服务器像客户端发送UDP数据包来调用
public boolean send(ServerMessage message){
return enqueue(message);
}
}
恩恩。基本上这个UDP发射器。的内容就是上面的。大家看看就可以明白了。我特意添加了比较多的注释。
好了。我们来说一下这个类。首先要知道 要明确。这个类 是用来发射所有的UDP消息的。 UDP服务器在启动之后。就将所有的通过UDP服务器监听的端口发射出去的数据 都放到这里面处理了。
然后我们来讲讲这个类的大概的一个流程。
1, 外部创建该发射器类对象,同时将外部的 UDP服务器的 DatagramChannel 传递进来。在该类的构造中 我们把这个 DatagramChannel 保存成了类的属性。 以方便我们发射UDP数据时使用。 这个类中 创建了一个消息队列。就是 ConcurrentLinkedQueue mq 这个消息队列 用来存放所有的需要发射的UDP数据消息包。 服务器在外部 需要发射UDP消息给客户端的时候。就需要调用Sender这个类的
public boolean send( ServerMessage message )
这个函数 在这个函数里。会将收到的这个消息包 放入消息队列中。 这就是外部的消息发送到Sender里面 要求 Sender把这个UDP消息发射出去时。直接执行的一些步骤。
然后我们再来看。这个类Sender 的定义。
public class Sender implements Runnable
这是一个 Runnable的子类。这个大家肯定都会明白。这个类中肯定有一个run函数。同时在外部需要使用Thread 来创建线程。同时需要外部调用 start 函数。来启动这个线程。在外部创建了这个Sender类之后。肯定是要调用start这个函数。在start函数被调用之后。Sender对象内部 在最终会执行到 run 函数里。
这是标准或者说是常规的线程 我就不多讲了。
我们来看看这个run函数 里面到底做了什么。
//UDP 数据发射器的线程体
public void run(){
//这个发射器在服务器启动时 就开始一直运行。直到服务器停止
while(!this.stoped){
try{
synchronized(enQueSignal){
//如果没有消息 并且 服务器没有关闭。就一直等待
while(mq.isEmpty() == true && stoped == false){
try{
enQueSignal.wait(1);
}catch(InterruptedException e){
}
//System.out.println("sender wake up");
}
//如果 有消息了 或者 服务器被关闭了。就开始处理UDP数据
//关于 服务器被关闭了 还处理消息这个可以不用纠结
//应为服务器关闭了。就必去去跳出内部的线程锁 执行外部循环
//外部循环才是判断服务器是否被关闭。所以得走到这里。
processMessage();
}
}catch(Exception e){
e.printStackTrace();
}catch(Throwable t){
t.printStackTrace();
}
}
}
代码很简单。就是 一个while 死循环、在内部 用一个synchronized 线程锁 锁住一个 内部的while 循环。这个内部的while循环。一直在判断 mq 这个消息队列里是否有消息 同时还判断 服务器是否被stop了。
如果没有消息 并且服务器没有被stop 那就进入内部while循环的循环体。里面是一个wait 这个大家也都懂,线程休眠。 然后就内部while循环体结束。然后就看这段代码。大家肯定能够明白。这里外部家了一个线程锁。内部常规情况下 肯定是要执行完毕 才能跳出这个线程锁之外再执行。所以这里的内部循环加了一个 stoped == false 这个判断,用来判断服务器是否被关闭了。当服务器被关闭了。不管mq里有没有消息。都会跳出这个内部循环、跳出synchronized 线程锁。因为服务器都关了。你还在这等待个啥? 所以这一小段线程锁、内部while循环 基本就是这个意思。也比较简单。大家看看就能懂了。
然后我们来说 synchronized 线程锁的下面 调用了一个函数。 processMessage(); 这个 看名字就能知道大概。处理消息 这再次提醒了我 写代码的函数名、变量名、类名神马的 都尽量要写一个有意义、有含义、能代表这个东西的作用的一个名字、好了题外话。不说了。
我们继续来看 调用了 processMessage() 这个函数 我们也知道这是用来处理消息的。好了来看下这个 processMessage函数的具体功能。
//处理要发送的UDP数据
protected void processMessage() throws Exception{
//清空缓冲区。
buffer.clear();
//从并发线程安全的消息队列中取出一个被封装的待发射的UDP消息
ServerMessage pendingMessage = dequeue();
//如果消息为空。就返回
if(pendingMessage == null){
//Thread.yield();
return;
}
//将待发射的UDP消息写入缓冲区
buffer.put(pendingMessage.getData());
buffer.flip();
//发射这个UDP数据包
channel.send(buffer, pendingMessage.getSocketAddress());
}
这个函数、最后的一行打印代码我给去掉了。就是输出消息到控制塔。看发送了什么消息 什么时间发送的。发送的目标地址。 就是System.out.println() 打印。代码太长了。我就给删了。是无关紧要的一行代码。用来测试的。
好了言归正传。我们来看这个函数的工作流程
1, 清空buffer缓冲区。这个缓冲区就是用来存放 或者说是 包装UDP具体要发送的byte数据的。
2, 从 mq 里取出一个UDP消息包 这里 取出 这个功能是调用了 dequeue() 这个函数 出对
3, 判断这个取出的UDP数据包是否为null 这肯定的 数据包都是 null 你还发啥UDP消息。
4, 将要发送的UDP数据 写入 buffer 这个缓冲区里 用来准备将这些数据发射出去。
5, buffer.flip() 这个函数 不知道大家了解不了解,这种东西不知道的都可以 搜索 一下就又一大堆讲这个东西的。 好吧 我也在这里重复的复述一遍这个函数的功能,flip()这个函数的功能就是将buffer的 limit 设置成当前的position 再把 position 设置成 0 不明白为啥? 那我再讲讲他的作用。Buffer 或者我们来说ByteBuffer 这个缓冲区 就像排列在一起的小盒子 在操作这一排小盒子的时候。最基本要明白的有两个东西 一个是position 还一个是limit 首先 position 是表示当前要操作的小盒子的位置或者说下标,buffer 每次被操作一次 这个position下标就向后移动一次, position的最开始的位置也就是第一个小盒子 位置是 0 最后一个小盒子的位置就是 小盒子个数 -1 再来说一下limit 这个玩意儿是用来限制 或者说是用来标志当前我们创建的这个buffer可以操作的最后一个小盒子 因为小盒子太多。都在内存里 全都排列起来的。所以我们需要用position 和 limit来标识我们所管理的小盒子的起始位置、当前要操作的位置、和终点 我们不能越界 会出大麻烦的。 好了明白了position 和limit 我们再来看我上面说的 flip() 这个函数 把limit设置位position 然后position设置为0 他的目的就是 重新将我们读取的当前位置下标设置到最开始的位置。应为我们已经把UDP要发射的数据都放到了buffer里放入就是写入 这个操作也是 放入一个byte position就会向下移动一个。当我们数据放完了。position之后的不是我们的数据了。所以我们需要把limit 设置成position 告诉别人 我们的数据只有到 position 的这个位置 这个位置之后的就不是我们写入的数据。就不要去读了。 然后我们在去发送UDP数据的时候。我们操作这个buffer 一个一个的取出数据发送出去。就是从position开始读取buffer里的数据。所以我们去读取buffer里的数据 就应该从 0 从头开始读我们写入到buffer里的数据。好了大家应该明白了吧。感觉这里我写的有点啰嗦了。没办法 我不是一个爱说话的人。在言谈方面并不好。我的初衷 或者说是我的想法是让大家能够看明白。好了下面继续看processMessage函数
6, 调用UDP Cannel的send函数 将具体的要发送的数据(已经写入到buffer里的数据) 发射出去。
好了 这就是 processMessage 这个函数的所有功能性的内容。
然后我们再次回到run函数里。在调用 processMessage函数 之后。我们可以看到基本上就没啥东西了。
好了 ddpush 学习之路 12 的Sender.java 这个类 我们大概都将完了。当然 我还想大家都明白。不要被我上面的篇幅给搞糊涂了。我下面做一个总结 或者说是对 ddpush 的UDP 服务器像客户端发送消息的这个 发送 这一块来流程梳理一下或者说流程讲解一下 当然 这个UDP 服务器向客户端发送消息的整体的这个发送功能就在这个 Sender.java 这个类里
首先 我们外部 创建了 Sender 这个类 他是一个Thread 外部start之后。Sender 内部最后开始执行run 在run中 有个while 这个while有点类似while true 就是死循环。 然后在这个循环体里 不断的去问消息队列 有没有要发送的 UDP消息 如果没有要发送的UDP消息 就一直在上面等待。 一旦发现 消息队列里 有消息 就开始从消息队列中 取出 一个消息 并对这个取出的消息对象做基本的非空验证。 如果这个消息正常 就将这个消息的具体要发送的byte数据写入buffer缓冲区 用来准备对接到UDP cannel里进行发射。 消息写入buffer之后。就开始发射了。这里的发射器是用的UDP服务器对象 DatagramChannel 将UDP消息发送给客户端
上面的是线程 处理 要发送的消息 查询是否有消息 有消息就发送 然后再继续查询 我们下面来看看这个消息是怎么到 消息队列的 这也比较简单
流程就是 外部调用 Sender 对象的 send(ServerMessage message)这个函数。传入要发射的UDP消息封装对象。 然后Sender收到了这个要发射的UDP消息包之后。将这个消息包放到了 消息队列中 然后就不管了。
这就是UDP消息进入 消息队列中的一个流程。
基本上整体流程就是。Sender 线程体run里一直在查看 消息队列 中是否有消息 有消息就去发送。 没消息就继续等待,继续查询。 然后外部如果有消息要发送 就会将要发送给客户端的UDP消息包装起来 然后将这个UDP消息包添加到 消息队列中 然后 run函数里就第一时间发现了这个消息包。然后就调用 消息发送 函数 这个UDP消息就被发射出去了。
突然感觉我说的好啰嗦。 就这样吧。主要能大家能看懂、看明白就好。
by brok1n 20150328