peer是整个BT通信中最复杂的部分,主要是里面各种消息的发送和一些choke和unchoke策略,piece选择策略等等。peer manager用于管理peer,本程序中维护多个peer进行远程通信。
本程序中peer是由CPeerLink类实现的。CPeerLink类中某些重要成员说明如下:
m_bAmChoking:我是否阻塞peer,true表示choke,否则unchoke。
m_bAmInterested:我是否对peer感兴趣,true表示感兴趣,否则不感兴趣。
m_bPeerChoking:peer是否阻塞我,true表示choke,否则unchoke。
m_bPeerInterested:peer是否对我感兴趣,true表示感兴趣,否则不感兴趣。
m_clPieceRequest:保存了我向peer请求的某个piece详情。
m_lstPeerPieceRequest:保存了peer向我请求的piece详情。
整个peer的通信流程如下:
(1)连接peer,并将其套接字加入到同步事件分离器的监视的描述符集中供其监视。
void CPeerLink::Connect(const char *IpAddr, int nPort)
{
CSocket::CreateTCPSocket();
CSocket::SetReactor(m_pPeerManager->GetTorrentTask()->GetSocketReactor());
m_strIPAddr = IpAddr;
m_nPort = nPort;
CSocket::SetHandleMask(WRITE_MASK);
CSocket::Connect(IpAddr, nPort);
m_nPeerState = PS_CONNECTING;
m_nConnTimeoutID = GetReactor()->AddTimer(this, 10000, true);
}
(2)同步事件分离器不断的轮询描述符集。
nRet = select(m_nMaxSocketFd + 1, &m_rSet, &m_wSet, NULL, &tmval);
(3)同步事件分离器检测到套接字可写,表示已经连接peer成功。调用并设置套接字可读,也就是将套接字加入到读描述符集中。
if (FD_ISSET((*it)->GetHandle(), &m_wSet))
{
int nRes = (*it)->HandleWrite();
if (nRes == -1)
{
(*it)->HandleClose();
(*it)->SetReactor(NULL);
(*it)->Close();
continue;
}
}
int CPeerLink::HandleWrite()
{
if (m_nPeerState == PS_CONNECTING)
{
m_nPeerState = PS_ESTABLISHED;
RemoveHandleMask(WRITE_MASK);
OnConnect();
return 0;
}
m_bCanWrite = true;
return 0;
}
void CPeerLink::OnConnect()
{
SetHandleMask(READ_MASK);
m_strSendBuffer.clear();
m_strRecvBuffer.clear();
m_bHandShaked = false;
m_clPieceRequest.CreatePieceRequestList(-1, 0, 0);
m_bAmChoking = true;
m_bAmInterested = false;
m_bPeerChoking = true;
m_bPeerInterested = false;
m_nDownloadCount = 0;
m_nUploadCount = 0;
m_nLastDownloadCount = 0;
m_nLastUploadCount = 0;
m_nLastUploadCount = 0;
m_nDownloadSpeed = 0;
m_nUploadSpeed = 0;
m_llLastCountSpeedTime = GetTickCount();
m_bCanRead = false;
m_bCanWrite = true;
m_pPeerManager->GetTorrentTask()->GetRateMeasure()->AddClient(this);
SendHandShake();
}
(4)发送handshake消息。
void CPeerLink::SendHandShake()
{
char szBuff[68];
memset(szBuff, 0, sizeof(szBuff));
szBuff[0] = 19;
strncpy(szBuff + 1, "BitTorrent protocol", 19);
strncpy(szBuff + 28,
(const char *) m_pPeerManager->GetTorrentTask()->GetTorrentFile()->GetInfoHash(),
20);
strncpy(szBuff + 48,
(const char *) m_pPeerManager->GetTorrentTask()->GetPeerID().c_str(),
20);
SendData(szBuff, sizeof(szBuff));
}
(5)同步事件分离器监测到套接字可读,处理各种消息,如handshake、choke、unchoke、piece等等。
if (FD_ISSET((*it)->GetHandle(), &m_rSet))
{
int nRes = (*it)->HandleRead();
if (nRes == -1)
{
(*it)->HandleClose();
(*it)->SetReactor(NULL);
(*it)->Close();
continue;
}
}
int CPeerLink::DoRead(long long llCount)
{
int nReadCount = 0;
char *pBuff = new char[RECV_BUFFER_SIZE];
for (; nReadCount < llCount;)
{
int nReadSize = RECV_BUFFER_SIZE;
if (nReadSize > llCount - nReadCount)
{
nReadSize = llCount - nReadCount;
}
int nRet = recv(GetHandle(), pBuff, nReadSize, 0);
if (nRet == 0)
{
CloseLink();
delete[] pBuff;
return nReadCount;
}
if (nRet == -1)
{
if (errno == EAGAIN)
{
m_bCanRead = false;
break;
}
if (errno == EINTR)
{
continue;
}
CloseLink();
delete[] pBuff;
return nReadCount;
}
if (nRet > 0)
{
nReadCount += nRet;
m_nDownloadCount += nRet;
m_pPeerManager->GetTorrentTask()->AddDownloadCount(nRet);
m_strRecvBuffer.append((const char *) pBuff, nRet);
}
}
delete[] pBuff;
ProcRecvData();
if (m_bCanRead)
{
if (GetHandleMask() & READ_MASK)
{
RemoveHandleMask(READ_MASK);
}
}
else
{
if (!(GetHandleMask() & READ_MASK))
{
SetHandleMask(READ_MASK);
}
}
return nReadCount;
}
关于消息的处理,以request消息为例进行介绍:
request:
int CPeerLink::ProcCmdRequest(void *pData, int nDataLen)
{
if (nDataLen != 12)
{
return -1;
}
if (m_bAmChoking)
{
return 0;
}
int nIndex = *((int *) pData);
nIndex = ntohl(nIndex);
int nOffset = *((int *) ((char *) pData + 4));
nOffset = ntohl(nOffset);
int nLen = *((int *) ((char *) pData + 8));
nLen = ntohl(nLen);
list<PeerPieceRequest>::iterator it = m_lstPeerPieceRequest.begin();
for (; it != m_lstPeerPieceRequest.end(); ++it)
{
if (it->nIndex == nIndex && it->nOffset == nOffset)
{
break;
}
}
if (it == m_lstPeerPieceRequest.end())
{
PeerPieceRequest stRequst;
stRequst.nIndex = nIndex;
stRequst.nOffset = nOffset;
stRequst.nLen = nLen;
m_lstPeerPieceRequest.push_back(stRequst);
}
DoPieceSend();
return 0;
}
这里使用m_lstPeerPieceRequest来保存peer已请求的piece,每次收到该消息后,将其保存下来。便于后面发送数据。如果消息已经发送或者收到peer发送来的cancel消息,则移除该项。
peer manager由CPeerManager类实现,主要做了以下两件事情:
(1)启动时添加两个定时器,一个用于检查peer的连接状况。另一个用于choke和unchoke peer。
bool CPeerManager::Startup()
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&m_MutexUnusedPeer, &attr);
pthread_mutexattr_destroy(&attr);
m_nConnectTimerID = m_pTorrentTask->GetSocketReactor()->AddTimer(this, 2000,
false);
m_nChokeTimerID = m_pTorrentTask->GetSocketReactor()->AddTimer(this, 10000,
false);
return true;
}
(2)添加peer到peer列表中。
void CPeerManager::AddPeerInfo(const char *pIpAddr, int nPort)
{
string strPeerLinkID = GenPeerLinkID(pIpAddr, nPort);
if (PeerExists(strPeerLinkID))
{
return;
}
PeerInfo stPeerInfo;
stPeerInfo.strLinkID = strPeerLinkID;
stPeerInfo.strIPAddr = pIpAddr;
stPeerInfo.nPort = nPort;
stPeerInfo.pPeerLink = NULL;
pthread_mutex_lock(&m_MutexUnusedPeer);
m_mapUnusedPeer[strPeerLinkID] = stPeerInfo;
pthread_mutex_unlock(&m_MutexUnusedPeer);
}
peer和peer manager的介绍就到此为止,大家可以去看看代码。写得比较简单,也不算很复杂的。
程序源代码下载地址:http://download.csdn.net/detail/zxywd/9415711