当前位置: 首页 > 工具软件 > BitTorrent > 使用案例 >

一个简单的BitTorrent客户端实现(六):peer manager和peer实现

空鸿云
2023-12-01

peer manager和peer

peer是整个BT通信中最复杂的部分,主要是里面各种消息的发送和一些choke和unchoke策略,piece选择策略等等。peer manager用于管理peer,本程序中维护多个peer进行远程通信。

peer实现

本程序中peer是由CPeerLink类实现的。CPeerLink类中某些重要成员说明如下:
m_bAmChoking:我是否阻塞peer,true表示choke,否则unchoke。
m_bAmInterested:我是否对peer感兴趣,true表示感兴趣,否则不感兴趣。
m_bPeerChoking:peer是否阻塞我,true表示choke,否则unchoke。
m_bPeerInterested:peer是否对我感兴趣,true表示感兴趣,否则不感兴趣。
m_clPieceRequest:保存了我向peer请求的某个piece详情。
m_lstPeerPieceRequest:保存了peer向我请求的piece详情。
整个peer的通信流程如下:
(1)连接peer,并将其套接字加入到同步事件分离器的监视的描述符集中供其监视。

void CPeerLink::Connect(const char *IpAddr, int nPort)
{
    CSocket::CreateTCPSocket();
    CSocket::SetReactor(m_pPeerManager->GetTorrentTask()->GetSocketReactor());

    m_strIPAddr = IpAddr;
    m_nPort = nPort;

    CSocket::SetHandleMask(WRITE_MASK);
    CSocket::Connect(IpAddr, nPort);
    m_nPeerState = PS_CONNECTING;

    m_nConnTimeoutID = GetReactor()->AddTimer(this, 10000, true);
}

(2)同步事件分离器不断的轮询描述符集。

nRet = select(m_nMaxSocketFd + 1, &m_rSet, &m_wSet, NULL, &tmval);

(3)同步事件分离器检测到套接字可写,表示已经连接peer成功。调用并设置套接字可读,也就是将套接字加入到读描述符集中。

            if (FD_ISSET((*it)->GetHandle(), &m_wSet))
            {
                int nRes = (*it)->HandleWrite();
                if (nRes == -1)
                {
                    (*it)->HandleClose();
                    (*it)->SetReactor(NULL);
                    (*it)->Close();
                    continue;
                }
            }
int CPeerLink::HandleWrite()
{
    if (m_nPeerState == PS_CONNECTING)
    {
        m_nPeerState = PS_ESTABLISHED;
        RemoveHandleMask(WRITE_MASK);

        OnConnect();

        return 0;
    }

    m_bCanWrite = true;
    return 0;
}

void CPeerLink::OnConnect()
{
    SetHandleMask(READ_MASK);

    m_strSendBuffer.clear();
    m_strRecvBuffer.clear();
    m_bHandShaked = false;

    m_clPieceRequest.CreatePieceRequestList(-1, 0, 0);

    m_bAmChoking = true;
    m_bAmInterested = false;
    m_bPeerChoking = true;
    m_bPeerInterested = false;

    m_nDownloadCount = 0;
    m_nUploadCount = 0;
    m_nLastDownloadCount = 0;
    m_nLastUploadCount = 0;
    m_nLastUploadCount = 0;
    m_nDownloadSpeed = 0;
    m_nUploadSpeed = 0;

    m_llLastCountSpeedTime = GetTickCount();

    m_bCanRead = false;
    m_bCanWrite = true;

    m_pPeerManager->GetTorrentTask()->GetRateMeasure()->AddClient(this);

    SendHandShake();
}

(4)发送handshake消息。

void CPeerLink::SendHandShake()
{
    char szBuff[68];
    memset(szBuff, 0, sizeof(szBuff));

    szBuff[0] = 19;
    strncpy(szBuff + 1, "BitTorrent protocol", 19);
    strncpy(szBuff + 28,
            (const char *) m_pPeerManager->GetTorrentTask()->GetTorrentFile()->GetInfoHash(),
            20);
    strncpy(szBuff + 48,
            (const char *) m_pPeerManager->GetTorrentTask()->GetPeerID().c_str(),
            20);

    SendData(szBuff, sizeof(szBuff));
}

(5)同步事件分离器监测到套接字可读,处理各种消息,如handshake、choke、unchoke、piece等等。

            if (FD_ISSET((*it)->GetHandle(), &m_rSet))
            {
               int nRes = (*it)->HandleRead();
               if (nRes == -1)
               {
                   (*it)->HandleClose();
                   (*it)->SetReactor(NULL);
                   (*it)->Close();
                   continue;
               }
            }
int CPeerLink::DoRead(long long llCount)
{
    int nReadCount = 0;
    char *pBuff = new char[RECV_BUFFER_SIZE];

    for (; nReadCount < llCount;)
    {
        int nReadSize = RECV_BUFFER_SIZE;
        if (nReadSize > llCount - nReadCount)
        {
            nReadSize = llCount - nReadCount;
        }

        int nRet = recv(GetHandle(), pBuff, nReadSize, 0);
        if (nRet == 0)
        {
            CloseLink();
            delete[] pBuff;
            return nReadCount;
        }

        if (nRet == -1)
        {
            if (errno == EAGAIN)
            {
                m_bCanRead = false;
                break;
            }

            if (errno == EINTR)
            {
                continue;
            }

            CloseLink();
            delete[] pBuff;
            return nReadCount;
        }

        if (nRet > 0)
        {
            nReadCount += nRet;
            m_nDownloadCount += nRet;
            m_pPeerManager->GetTorrentTask()->AddDownloadCount(nRet);

            m_strRecvBuffer.append((const char *) pBuff, nRet);
        }
    }

    delete[] pBuff;

    ProcRecvData();

    if (m_bCanRead)
    {
        if (GetHandleMask() & READ_MASK)
        {
            RemoveHandleMask(READ_MASK);
        }
    }
    else
    {
        if (!(GetHandleMask() & READ_MASK))
        {
            SetHandleMask(READ_MASK);
        }
    }

    return nReadCount;
}

关于消息的处理,以request消息为例进行介绍:
request:

int CPeerLink::ProcCmdRequest(void *pData, int nDataLen)
{
    if (nDataLen != 12)
    {
        return -1;
    }

    if (m_bAmChoking)
    {
        return 0;
    }

    int nIndex = *((int *) pData);
    nIndex = ntohl(nIndex);
    int nOffset = *((int *) ((char *) pData + 4));
    nOffset = ntohl(nOffset);
    int nLen = *((int *) ((char *) pData + 8));
    nLen = ntohl(nLen);

    list<PeerPieceRequest>::iterator it = m_lstPeerPieceRequest.begin();
    for (; it != m_lstPeerPieceRequest.end(); ++it)
    {
        if (it->nIndex == nIndex && it->nOffset == nOffset)
        {
            break;
        }
    }

    if (it == m_lstPeerPieceRequest.end())
    {
        PeerPieceRequest stRequst;
        stRequst.nIndex = nIndex;
        stRequst.nOffset = nOffset;
        stRequst.nLen = nLen;

        m_lstPeerPieceRequest.push_back(stRequst);
    }

    DoPieceSend();

    return 0;
}

这里使用m_lstPeerPieceRequest来保存peer已请求的piece,每次收到该消息后,将其保存下来。便于后面发送数据。如果消息已经发送或者收到peer发送来的cancel消息,则移除该项。

peer manager实现

peer manager由CPeerManager类实现,主要做了以下两件事情:
(1)启动时添加两个定时器,一个用于检查peer的连接状况。另一个用于choke和unchoke peer。

bool CPeerManager::Startup()
{
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP);
    pthread_mutex_init(&m_MutexUnusedPeer, &attr);
    pthread_mutexattr_destroy(&attr);

    m_nConnectTimerID = m_pTorrentTask->GetSocketReactor()->AddTimer(this, 2000,
            false);

    m_nChokeTimerID = m_pTorrentTask->GetSocketReactor()->AddTimer(this, 10000,
            false);

    return true;
}

(2)添加peer到peer列表中。

void CPeerManager::AddPeerInfo(const char *pIpAddr, int nPort)
{
    string strPeerLinkID = GenPeerLinkID(pIpAddr, nPort);
    if (PeerExists(strPeerLinkID))
    {
        return;
    }

    PeerInfo stPeerInfo;
    stPeerInfo.strLinkID = strPeerLinkID;
    stPeerInfo.strIPAddr = pIpAddr;
    stPeerInfo.nPort = nPort;
    stPeerInfo.pPeerLink = NULL;

    pthread_mutex_lock(&m_MutexUnusedPeer);
    m_mapUnusedPeer[strPeerLinkID] = stPeerInfo;
    pthread_mutex_unlock(&m_MutexUnusedPeer);
}

peer和peer manager的介绍就到此为止,大家可以去看看代码。写得比较简单,也不算很复杂的。
程序源代码下载地址:http://download.csdn.net/detail/zxywd/9415711

 类似资料: