当前位置: 首页 > 工具软件 > KCP > 使用案例 >

kcp-go 源码分析(一)

徐正雅
2023-12-01

        项目中用的是"github.com/xtaci/kcp-go",这个仓库不仅仅实现了kcp算法,而且在kcp算法层面上又包装了一层,比如说提供了多种数据加密方式和FEC前向纠错,用起来非常方便。

        这一篇先分析kcp-go库对kcp算法包装这部分,先分析怎么用,怎么用都不知道去谈算法个人觉得没有意义,kcp算法层面的东西后面再分析。

1:先分析服务器层面的

func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr, nil, 0, 0) }

func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) {
	udpaddr, err := net.ResolveUDPAddr("udp", laddr)
	if err != nil {
		return nil, errors.WithStack(err)
	}
	conn, err := net.ListenUDP("udp", udpaddr)
	if err != nil {
		return nil, errors.WithStack(err)
	}

	return ServeConn(block, dataShards, parityShards, conn)
}
func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) {
	l := new(Listener)
	l.conn = conn
	l.sessions = make(map[string]*UDPSession)
	l.chAccepts = make(chan *UDPSession, acceptBacklog)
	l.chSessionClosed = make(chan net.Addr)
	l.die = make(chan struct{})
	l.dataShards = dataShards
	l.parityShards = parityShards
	l.block = block
	l.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
	l.chSocketReadError = make(chan struct{})

	// calculate header size
	if l.block != nil {
		l.headerSize += cryptHeaderSize
	}
	if l.fecDecoder != nil {
		l.headerSize += fecHeaderSizePlus2
	}

	go l.monitor()
	return l, nil
}

调用kcp.Listen 或者 kcp.ListenWithOptions 这是入口,ServeConn会调用monitor,这个monitor方法是Listener类中的一个方法,不同的平台有不同的实现,readloop_generic.go和readloop_linux.go中都有相应的实现,最终都会回到defaultMonitor()中来,具体看实现。

func (l *Listener) defaultMonitor() {
	buf := make([]byte, mtuLimit)
	for {
		if n, from, err := l.conn.ReadFrom(buf); err == nil {
			if n >= l.headerSize+IKCP_OVERHEAD {
				l.packetInput(buf[:n], from)
			} else {
				atomic.AddUint64(&DefaultSnmp.InErrs, 1)
			}
		} else {
			l.notifyReadError(errors.WithStack(err))
			return
		}
	}
}

/*
ReadFrom(p []byte) (n int, addr Addr, err error)
type Addr interface {
	Network() string // name of the network (for example, "tcp", "udp")
	String() string  // string form of address (for example, "192.0.2.1:25", "[2001:db8::1]:80")
}
*/

其实就是不停的从conn中读数据,这个conn就是调用net.ListenUDP返回的对象,类似twisted中的服务器对象factory。n, from, err := l.conn.ReadFrom(buf) buf的空间是设置的mutLimit大小,每次最大读这么多内容,n是成功读取的长度,from是对方(client)的详细信息,如果数据满足一个整包的要求就把这些数据塞到packetInput中,

func (l *Listener) packetInput(data []byte, addr net.Addr) {
	dataValid := false
	if l.block != nil {
		l.block.Decrypt(data, data)
		data = data[nonceSize:]
		checksum := crc32.ChecksumIEEE(data[crcSize:])
		if checksum == binary.LittleEndian.Uint32(data) {
			data = data[crcSize:]
			dataValid = true
		} else {
			atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
		}
	} else if l.block == nil {
		dataValid = true
	}

	if dataValid {
		l.sessionLock.Lock()
		s, ok := l.sessions[addr.String()]
		l.sessionLock.Unlock()

		var conv, sn uint32
		convValid := false
		if l.fecDecoder != nil {
			isfec := binary.LittleEndian.Uint16(data[4:])
			if isfec == typeData {
				conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
				sn = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2+IKCP_SN_OFFSET:])
				convValid = true
			}
		} else {
			conv = binary.LittleEndian.Uint32(data)
			sn = binary.LittleEndian.Uint32(data[IKCP_SN_OFFSET:])
			convValid = true
		}

		if ok { // existing connection
			if !convValid || conv == s.kcp.conv { // parity or valid data shard
				s.kcpInput(data)
			} else if sn == 0 { // should replace current connection
				s.Close()
				s = nil
			}
		}

		if s == nil && convValid { // new session
			if len(l.chAccepts) < cap(l.chAccepts) { // do not let the new sessions overwhelm accept queue
				s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, addr, l.block)
				s.kcpInput(data)
				l.sessionLock.Lock()
				l.sessions[addr.String()] = s
				l.sessionLock.Unlock()
				l.chAccepts <- s
			}
		}
	}
}

这里边主要干了一下几件事儿了

a:数据有加密就执行解密 

b:读取kcp协议头里边的conv和sn

c:通过s, ok := l.sessions[addr.String()]判断当前这个连接是旧的连接还是新的连接,sessions里边保存了所有的client连接信息,如果是旧连接就把调用session的s.kcpInput(),如果是一个新连接进来并且conv合法就新创建一个客户端(udpsession)对象,把这个对象保存到listener的sessions当中。这里边在极端情况下可能会有问题,如果2个客户端在同一个局域网的情况下连接服务器 外网IP又相同,恰好2个客户端随机的端口又一样这样的话key=addr.String()会是一模一样,2个不同的客户端会被服务器认为是同一个。

func (s *UDPSession) kcpInput(data []byte) {
	var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64

	if s.fecDecoder != nil {
		if len(data) > fecHeaderSize { // must be larger than fec header size
			f := fecPacket(data)
			if f.flag() == typeData || f.flag() == typeParity { // header check
				if f.flag() == typeParity {
					fecParityShards++
				}

				// lock
				s.mu.Lock()
				recovers := s.fecDecoder.decode(f)
				if f.flag() == typeData {
					if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
						kcpInErrors++
					}
				}

				for _, r := range recovers {
					if len(r) >= 2 { // must be larger than 2bytes
						sz := binary.LittleEndian.Uint16(r)
						if int(sz) <= len(r) && sz >= 2 {
							if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
								fecRecovered++
							} else {
								kcpInErrors++
							}
						} else {
							fecErrs++
						}
					} else {
						fecErrs++
					}
					// recycle the recovers
					xmitBuf.Put(r)
				}

				// to notify the readers to receive the data
				if n := s.kcp.PeekSize(); n > 0 {
					s.notifyReadEvent()
				}
				// to notify the writers
				waitsnd := s.kcp.WaitSnd()
				if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
					s.notifyWriteEvent()
				}

				s.uncork()
				s.mu.Unlock()
			} else {
				atomic.AddUint64(&DefaultSnmp.InErrs, 1)
			}
		} else {
			atomic.AddUint64(&DefaultSnmp.InErrs, 1)
		}
	} else {
		s.mu.Lock()
		if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
			kcpInErrors++
		}
		if n := s.kcp.PeekSize(); n > 0 {
			s.notifyReadEvent()
		}
		waitsnd := s.kcp.WaitSnd()
		if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
			s.notifyWriteEvent()
		}
		s.uncork()
		s.mu.Unlock()
	}
	atomic.AddUint64(&DefaultSnmp.InPkts, 1)
	atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
	if fecParityShards > 0 {
		atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
	}
	if kcpInErrors > 0 {
		atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
	}
	if fecErrs > 0 {
		atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
	}
	if fecRecovered > 0 {
		atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
	}

}
kcpInput看着挺复杂,前边长篇大论都是如果启动了FEC时根据FEC的规则读取数据。如果没有启用FEC,数据就不用处理就调用 s.kcp.Input(data, true, s.ackNoDelay)把数据塞到kcp算法中进行处理,然后判断kcp还能不能继续读和写进而发送相应的事件。
func (s *UDPSession) notifyReadEvent() {
	select {
	case s.chReadEvent <- struct{}{}:
	default:
	}
}
func (s *UDPSession) notifyWriteEvent() {
	select {
	case s.chWriteEvent <- struct{}{}:
	default:
	}
}
如果能继续读就向chReadEvent中发送数据,而Read()函数中有监听chReadEvent这个channel的事件
如果能继续写就向chWriteEvent中发送数据,而Write()中有监听chWriteEvent这个channel的事件
Read()就是经过kcp解包拿数据,Write()就是把数据写到kcp经过它的包装加上kcp包头。

2:在来看看客户端的一些逻辑

func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0, 0) }
func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
	// network type detection
	udpaddr, err := net.ResolveUDPAddr("udp", raddr)
	if err != nil {
		return nil, errors.WithStack(err)
	}
	network := "udp4"
	if udpaddr.IP.To4() == nil {
		network = "udp"
	}

	conn, err := net.ListenUDP(network, nil)
	if err != nil {
		return nil, errors.WithStack(err)
	}

	return NewConn(raddr, block, dataShards, parityShards, conn)
}

// NewConn3 establishes a session and talks KCP protocol over a packet connection.
func NewConn3(convid uint32, raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
	return newUDPSession(convid, dataShards, parityShards, nil, conn, raddr, block), nil
}

// NewConn2 establishes a session and talks KCP protocol over a packet connection.
func NewConn2(raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
	var convid uint32
	binary.Read(rand.Reader, binary.LittleEndian, &convid)
	return NewConn3(convid, raddr, block, dataShards, parityShards, conn)
}

// NewConn establishes a session and talks KCP protocol over a packet connection.
func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
	udpaddr, err := net.ResolveUDPAddr("udp", raddr)
	if err != nil {
		return nil, errors.WithStack(err)
	}
	return NewConn2(udpaddr, block, dataShards, parityShards, conn)
}

客户端调用Dial 或者 DialWithOptions就会得到一个Session对象(类似twisted中的protocol对象)

这里边重点说一下NewConn2方法,通过rand.Reader随机的产生了4个字节的convid,这个是convid是kcp协议头的组成部分,服务器收到数据后读取convid 作为数据合法性判断的重要依据。

客户端获取了session对象之后 就可以正常的read和write了,跟上边server介绍的read、write是一抹一样的情况。

 类似资料: