当前位置: 首页 > 工具软件 > Kamike.divide > 使用案例 >

kamike.divide_下一个开源项目Kamike.fast的框架已经搭好,是采用UDP进行完整的大文件传输。...

欧阳飞章
2023-12-01

下一个开源项目Kamike.fast的框架已经搭好,计划用一到两天时间开发完毕。

采用UDP协议,在跨大洲之间进行完整的大文件(超过2GB)传输。

之前在项目里面一直用aspera的软件,aspera也就是速珀,作为一个软件,最近刚拿了艾美奖。但是拿奖之后,出名了,就被IBM收购了。

我担心IBM收购后,Aspera估计就没那么便宜了,所以准备根据自己使用Aspera的经验,写一个看看。作为Aspera万一被IBM涨价后的备份方案。

主要解决思路和技术点如下:

对每个报文进行文件任务标志。可以同时传输多个文件。

发送文件内容之前,先发送文件名。然后返回文件名收到后的确认,然后再开始传输。

对发送报文进行计数,每隔1000个报文(某个时间间隔),接收端发送一个计数报文,报告丢包率。采取降低发送速度等措施。

长期丢包率在99%以上,尝试提高发送速度一倍。如果丢包率超过10%就降速。

文件传输中断,有中断信息。接收端长期没有接受到文件报文,则终止任务。

发送完毕后,接收端发送完毕报文,结束发送。

10000个报文组成一个块的概念,接收端每收到10000个(某个数字)报文,发送接受块成功的消息。整个块不会重复发送。如果块出现某个报文发送失败,则重新发送报文。

每个UDP报文,大小是1464字节,其中64个字节用来标示任务。

采用java编写,在web端使用,在tomcat启动时,也就是servlet的contextlistener启动的时候,启动守护进程,监听udp的80端口。是否与servlet进行协同,待定。

为了支持子网,客户端只负责发送和上传数据,不能下载数据。所以计数和控制,可能需要tcp的参与。

----------------------------

继续整理思路。

首先是报文,为了降低开发工作量。

先使用udp报文的48个字节。剩下24个还没想好。

首先将48个字节分为6个long类型。

第一个long,标识版本和报文类型。

比如1表示是数据传输

2表示是重传通知等等。

第二个long,是uuid的high位

第三个long,是uuid的low位。

一起标识一个文件。

第四个long是包序号。

第五个long是包内报文序号

第六个long是下面的报文长度。

------------------------------

继续:如果图省事,这个待定

第七个long是在被传输文件里面的位移量

第八个long是在传输文件里面的写入长度

--------------

继续。

一开始为了测试方便,将udp报文的数据长度设为1024,总长度是1024+64=1088

将缓冲窗口设为512KB,即在内存开辟一个512KB的缓冲器,当收满512个UDP报文时,写内存中的数据写入目标文件。

如果512个报文在单位时间内,没有收满,说明有丢包,则选择丢失的报文重传。丢包判断待定。在UDT协议里面是采用等待4倍的RTT时间判断的,在4倍的RTT时间后未到达的报文都被判断为丢失。

这个可以测试后再做决定。

--------------------------------

目前完成了udp的服务启动。数据传输窗口的相关开发。

到现在的总结:其实只要保证窗口内的512KB数据传输成功即可。每512KB作为一个window,然后返回确认信息。客户端确认接到512KB完成的确认信息后,再去发送下一个512KB。

确认信息里面可以加入crc校验或者MD5校验。客户端判断一下是否正确,决定是否重传。如果客户端决定,继续发送,则直接发送报文即可。服务器端,接到新的报文,如果是新报文ID,则将旧512KB数据写入实体文件,然后更新window,将收到的第一个报文写入window的相应位置。

如果客户端没有收到服务器端的window结束的确认报文,则等待,而不发送新的报文。服务器端等待4个RTT时间,再发送确认报文,最多重试5次。如果5次客户端都没收到确认报文,不发送新数据,服务器没有接受新数据,则服务器端认为网络中断。停止任务。将任务写入临时文件。等待客户端下一次重传唤醒。

现在先去睡觉,明天凌晨4点起来再写。

刚才浏览下网页又有新的收获:

chrome的websocket已经实现了udp,估计这个程序的上传客户端可以直接拿javascript来写了。比较理想了。

copy代码参考如下:

// Handle the data responsevar handleDataEvent = function(d) {

var data = chrome.socket.read(d.socketId);

console.log(data);};// Create the Socketchrome.socket.create('udp', '127.0.0.1', 1337, { onEvent: handleDataEvent },

function(socketInfo) {

// The socket is created, now we want to connect to the service

var socketId = socketInfo.socketId;

chrome.socket.connect(socketId, function(result) {

// We are now connected to the socket so send it some data

chrome.socket.write(socketId, arrayBuffer,

function(sendInfo) {

console.log("wrote " + sendInfo.bytesWritten);

}

);

});

});

----------------------

四点多起床,开工。

之前的思路有点问题,关于客户端停止发送这一点,会导致传输效率变低,应该是客户端继续发送,直到5个RTT时间后,没有收到上个window结束信号之后,再停止发送。

UDP之所以比TCP效率高,本质原因是TCP的一个bug,TCP将RTT时间和带宽联系在一起,片面认为RTT时间越长,带宽越低,而真实情况是,RTT和带宽没有直接关系。如果跟RTT建立联系了,我写UDP传输就没意思了。

------------------------

目前进展顺利,更新几个重要的注释:

//解析中靶情况

//取出自己已经发送的包的数量

//统计中靶的数量

//判断丢包率

//如果丢包率高于20%,则降低传输速度20%

//如果丢包率等于99%以上,则提高120%的速度

//如果箭已经射完,但是靶上还有箭没有,则重射,参数windowId,packetId,全部放入内存了

//如果确认某个window的最后一箭的消息丢失,等5个rrt,重新放这个window的最后一箭

break;

case Data:

//如果是某个windows最后一箭,则写入文件。发回确认最后一箭后的中靶情况。换一个window。

//如果window不是当前window了。那么返回全部中靶信息。

//重新设计头部

//high

//low

//window long window 序号

//type int 类别

//id int 包序号

//length int 包长度

//count int window内包数

非常像射箭,先准备好10支剑,然后连续射向靶心,射完箭袋里面的箭后,数靶心上的箭数目,如果少于射出去的箭的个数,那么补射没射中的几只箭。

如果补射都中了,就换下一个靶子,重新开始。

上面的逻辑实现代码如下:

DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

socket.receive(packet);

//解析头部

this.header.load(packet);

//是否是数据的

PacketType type = PacketType.values()[this.header.getType()];

switch (type) {

//为了发送,发现丢包,调整速度

case Target:

Archer archer = inst.getArcher(header.getHigh(), header.getLow());

if (archer == null) {

return;

}

archer.update();//有回包,说明网络OK

int packetSend = archer.getPacket(header.getWindow());

lost = (packetSend - header.getScore()) / packetSend;

//archer.setBandwidth((long)(archer.getBandwidth()/lost));

if (lost > 0.10) {

archer.setBandwidth((long) (archer.getBandwidth() / lost));

} else {

if (lost 

archer.setBandwidth((long) (archer.getBandwidth() * 1.2));

} else {

//啥也不做保持原速

}

}

//补射

this.result.load(header, packet);

for (int i = 0; i 

byte[] data = result.getBuffer();

if (data[i] == 0x0) {

Miss miss = new Miss();

miss.setPacket(i);

miss.setWindow(header.getWindow());

archer.miss(miss);

}

}

break;

//

case Data:

Target target = inst.getTarget(header.getHigh(), header.getLow());

target.update();

//新的文件传递

if (target == null) {

Bow initBow = new Bow(packet.getAddress(), packet.getPort());

target = new Target(initBow, header.getHigh(), header.getLow());

target.open();

inst.addTarget(target);//这里启动的线程,此线程只是用来测量心跳

}

//是否需要换弓,应对对称式的nat,必要的情况可以缓冲多把弓,目前只有一把

Bow bow = target.getBow();

if (bow != null) {

if ((!bow.getAddress().equals(packet.getAddress())) && bow.getPort() != packet.getPort()) {

bow.close();

Bow initBow = new Bow(packet.getAddress(), packet.getPort());

target.setBow(initBow);

}

}

Iterator iter = target.Windows(); //此处没有限制window的数量,有可能导致系统崩溃,不过测试应该问题不大

boolean isExist = false;

Window win = null;

while (iter.hasNext()) {

win = iter.next();

if (win.getId() == header.getWindow()) {

isExist = true;

break;

}

}

if (win == null || !isExist) {

//此报文对应的窗口不存在

if (header.getWindow() > target.getPosition() / FastConfig.WindowLength) {

win = new Window(header);

this.result.load(header, packet);

win.setData(header.getId(), this.result.getBuffer());

target.addWindow(win);

} else {

//如果是陈旧报文,则丢弃此报文

}

} else {

this.result.load(header, packet);

win.setData(header.getId(), this.result.getBuffer());

if (win.isFull()) {

target.write(header.getWindow(), buffer);

//删除此窗口

target.removeWindow(win);

}

}

break;

}

目前的代码已经开源到github,同样是lgpl协议。大家有兴趣的可以参考。

更新今天更新去掉了对guava的依赖。

 类似资料: