当前位置: 首页 > 工具软件 > KCP > 使用案例 >

C/C++ 对于KCP控制协议的封装对象

凌经赋
2023-12-01

Xtcp.h 

#pragma once

#include <frp/IDisposable.h>
#include <frp/net/IPEndPoint.h>
#include <frp/threading/Hosting.h>

namespace frp {
    namespace xtcp {
        class Xtcp : public IDisposable {
        public:
            typedef std::function<void(int)>                        ReadAsyncCallback;
            typedef std::function<void(bool)>                       WriteAsyncCallback;

        private:
            struct message {
                std::shared_ptr<Byte>                               packet;
                int                                                 packet_size;
                WriteAsyncCallback                                  callback;
            };
            typedef std::shared_ptr<message>                        pmessage;
            typedef std::list<pmessage>                             message_queue;

        public:
            Xtcp(boost::asio::ip::udp::socket& socket, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
            ~Xtcp() noexcept;

        public:
            virtual bool                                            ReadAsync(const void* buffer, int offset, int length, const ReadAsyncCallback& callback) noexcept;
            virtual bool                                            WriteAsync(const void* buffer, int offset, int length, const WriteAsyncCallback& callback) noexcept;
            virtual void                                            Dispose() noexcept override;
            virtual void                                            Update(UInt64 now) noexcept;
            virtual UInt64                                          CurrentMillisec() noexcept;

        public:
            virtual bool                                            IsAvailable() noexcept;
            inline frp::net::AddressFamily                          GetAddressFamily() noexcept { return af_; }
            inline int                                              GetChannel() noexcept { return channel_; }
            inline boost::asio::ip::udp::socket*                    GetSocket() noexcept { return socket_.load(); }
            inline boost::asio::ip::udp::endpoint&                  GetRemoteEndPoint() noexcept { return remoteEP_; }
            Xtcp&                                                   SetRemoteEndPoint(const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
            boost::asio::ip::udp::endpoint                          GetLocalEndPoint() noexcept;

        public:
            virtual bool                                            Listen() noexcept;
            virtual bool                                            Open() noexcept;
            void                                                    Close() noexcept;
            bool                                                    NoDelay(bool value) noexcept;
            int                                                     GetMinRetransmissionTimeout() noexcept;
            bool                                                    SetMinRetransmissionTimeout(int timeout) noexcept;

        public:
            virtual std::shared_ptr<Byte>                           GetBuffer(int& length) = 0;
            virtual bool                                            Input(const void* buffer, int offset, int length, const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
            virtual bool                                            Output(const void* buffer, int offset, int length, const boost::asio::ip::udp::endpoint& destinationEP) noexcept;

        private:
            bool                                                    Timeout() noexcept;
            bool                                                    TryReadNetStack(void* state) noexcept;
            void                                                    TryFlushNetStack(void* state) noexcept;
            void                                                    Finalize() noexcept;
            void                                                    CloseNetStack() noexcept;
            void                                                    CloseSocket() noexcept;
            void                                                    CancelTimeout() noexcept;
            void                                                    CancelReadAsync() noexcept;
            void                                                    ClearAllMessages() noexcept;

        private:
            std::atomic<bool>                                       disposed_;
            std::atomic<bool>                                       open_;
            std::atomic<bool>                                       listen_;
            std::atomic<void*>                                      kcp_;
            std::atomic<boost::asio::ip::udp::socket*>              socket_;
            frp::net::AddressFamily                                 af_;
            int                                                     channel_;  
            boost::asio::ip::udp::endpoint                          sourceEP_;
            boost::asio::ip::udp::endpoint                          remoteEP_;
            boost::asio::deadline_timer                             timeout_;
            message_queue                                           messages_;
            struct {
                char*                                               buffer;
                int                                                 length;
                ReadAsyncCallback                                   callback;
            }                                                       readACB_;
        };

        class XtcpClient : public Xtcp {
        public:
            XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, const std::shared_ptr<boost::asio::ip::udp::socket>& socket, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
            XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
            XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const boost::asio::ip::udp::endpoint& localEP, const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
            XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const std::string& host, int port) noexcept;

        public:
            std::shared_ptr<boost::asio::ip::udp::socket>           GetSocket() noexcept;
            std::shared_ptr<boost::asio::io_context>                GetContext() noexcept;
            std::shared_ptr<frp::threading::Hosting>                GetHosting() noexcept;
            virtual std::shared_ptr<Byte>                           GetBuffer(int& length) noexcept override;

        public:
            static boost::asio::ip::udp::endpoint                   ToEndPoint(const std::string host, int port) noexcept;
            static std::shared_ptr<boost::asio::ip::udp::socket>    CreateSocket(const std::shared_ptr<boost::asio::io_context>& context, const boost::asio::ip::udp::endpoint& localEP) noexcept;

        private:
            std::shared_ptr<frp::threading::Hosting>                hosting_;
            std::shared_ptr<boost::asio::io_context>                context_;
            std::shared_ptr<boost::asio::ip::udp::socket>           socket_;
            std::shared_ptr<Byte>                                   buffer_;
        };
    }
}

Xtcp.cpp

#include <frp/xtcp/Xtcp.h>
#include <frp/xtcp/ikcp.h>
#include <frp/net/Ipep.h>
#include <frp/net/Socket.h>

using frp::net::AddressFamily;
using frp::net::Ipep;
using frp::net::IPEndPoint;

namespace frp {
    namespace xtcp {
        struct IKcpAllocator {
            inline IKcpAllocator() noexcept {
                ikcp_allocator(
                    [](size_t sz) noexcept {
                        return Malloc(sz);
                    },
                    [](void* p) noexcept {
                        Mfree(p);
                    });
            }
        };

        /* Set the default memory allocator for Xtcp globally. */
        static IKcpAllocator __XtcpAllocator__;

        Xtcp::Xtcp(boost::asio::ip::udp::socket& socket, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept
            : disposed_(false)
            , open_(false)
            , listen_(false)
            , kcp_(NULL)
            , channel_(channel)
            , socket_(addressof(socket))
            , timeout_(socket.get_executor()) {
            /* Initialize the Read Async Control Block. */
            readACB_.buffer = NULL;
            readACB_.length = 0;

            /* If the socket is not opened, try opening the socket. */
            if (!socket.is_open()) {
                boost::asio::ip::address address = remoteEP.address(); /* Destination endpoint was incorrect. */
                if (address.is_loopback()) {
                    address = boost::asio::ip::address_v6::loopback();
                }
                else {
                    address = boost::asio::ip::address_v6::any();
                }
                frp::net::Socket::OpenSocket(socket, address, IPEndPoint::MinPort);
            }

            /* Create a KCP-ARQ congestion control network tunnels. */
            ikcpcb* kcp = ikcp_create(channel, this);
            if (kcp) {
                /* Sets the function properties of the KCP protocol stack. */
                ikcp_setmtu(kcp, 1400);
                ikcp_wndsize(kcp, 128, 128);

                /* Sets the field properties of the KCP protocol stack. */
                ikcp_setoutput(kcp,
                    [](const char* buf, int len, ikcpcb* kcp, void* user) noexcept {
                        Xtcp* sender = (Xtcp*)user;
                        return sender->Output(buf, 0, len, sender->GetRemoteEndPoint()) ? -1 : 0;
                    });
            }
            kcp_.exchange(kcp);

            /* Nagle algorithm is turned on by default. */
            NoDelay(false);

            /* Determines the IP protocol cluster type of the current socket. */
            boost::system::error_code ec;
            af_ = AddressFamily::InterNetwork;

            boost::asio::ip::udp::endpoint localEP = socket.local_endpoint(ec);
            if (!ec) {
                boost::asio::ip::address localIP = localEP.address();
                if (localIP.is_v6()) {
                    af_ = AddressFamily::InterNetworkV6;
                }
            }

            /* Marks the current remote endpoint address. */
            SetRemoteEndPoint(remoteEP);
        }

        Xtcp::~Xtcp() noexcept {
            Finalize();
        }

        void Xtcp::Finalize() noexcept {
            CloseNetStack();
            CloseSocket();
            CancelTimeout();
            CancelReadAsync();
            ClearAllMessages();
        }

        void Xtcp::Close() noexcept {
            Dispose();
        }

        void Xtcp::Dispose() noexcept {
            if (!disposed_.exchange(true)) {
                Finalize();
            }
        }

        bool Xtcp::Open() noexcept {
            boost::asio::ip::udp::socket* socket = socket_.load();
            if (!socket) {
                return false;
            }

            if (!socket->is_open()) {
                return false;
            }

            if (disposed_) {
                return false;
            }

            if (open_.exchange(true)) {
                return false;
            }

            return Timeout();
        }

        bool Xtcp::Listen() noexcept {
            boost::asio::ip::udp::socket* socket = socket_.load();
            if (!socket) {
                return false;
            }

            if (!socket->is_open()) {
                return false;
            }

            if (disposed_) {
                return false;
            }

            if (!open_) {
                return false;
            }

            if (listen_.exchange(true)) {
                return false;
            }

            int buffer_size = 0;
            std::shared_ptr<Byte> buffer = GetBuffer(buffer_size);
            std::shared_ptr<Reference> reference = GetReference();

            socket->async_receive_from(boost::asio::buffer(buffer.get(), buffer_size), sourceEP_,
                [reference, this, buffer](const boost::system::error_code& ec, std::size_t sz) noexcept {
                    if (listen_.exchange(false)) {
                        int len = std::max<int>(ec ? -1 : sz, -1);
                        if (len > 0) {
                            Input(buffer.get(), 0, len, sourceEP_);
                        }
                        Listen();
                    }
                });
            return true;
        }

        bool Xtcp::Timeout() noexcept {
            if (!IsAvailable()) {
                return false;
            }

            std::shared_ptr<Reference> reference = GetReference();
            timeout_.expires_from_now(boost::posix_time::milliseconds(1));
            timeout_.async_wait(
                [reference, this](const boost::system::error_code& ec) noexcept {
                    if (!ec) {
                        Update(CurrentMillisec());
                        Timeout();
                    }
                });
            return true;
        }

        bool Xtcp::NoDelay(bool value) noexcept {
            ikcpcb* kcp = (ikcpcb*)kcp_.load();
            if (!kcp) {
                return false;
            }

            int nodelay = value ? 2 : 0;
            int hr = ikcp_nodelay(kcp, nodelay, 10, 2, 1);
            if (hr < 0) {
                return false;
            }

            return true;
        }

        int Xtcp::GetMinRetransmissionTimeout() noexcept {
            ikcpcb* kcp = (ikcpcb*)kcp_.load();
            if (!kcp) {
                return -1;
            }
            return kcp->rx_minrto;
        }

        bool Xtcp::SetMinRetransmissionTimeout(int timeout) noexcept {
            if (timeout < 10) {
                timeout = 10;
            }

            ikcpcb* kcp = (ikcpcb*)kcp_.load();
            if (!kcp) {
                return false;
            }

            kcp->rx_minrto = timeout;
            return true;
        }

        UInt64 Xtcp::CurrentMillisec() noexcept {
            return GetTickCount();
        }

        void Xtcp::Update(UInt64 now) noexcept {
            ikcpcb* kcp = (ikcpcb*)kcp_.load();
            if (kcp) {
                TryReadNetStack(kcp);
                TryFlushNetStack(kcp);
                ikcp_update(kcp, now & 0xfffffffful);
            }
        }

        bool Xtcp::Output(const void* buffer, int offset, int length, const boost::asio::ip::udp::endpoint& destinationEP) noexcept {
            /* Check the validity of the input parameters. */
            if (!buffer || offset < 0 || length < 1) {
                return false;
            }

            /* Atom gets the pointer address of the socket and ret out if null. */
            boost::asio::ip::udp::socket* socket = socket_.load();
            if (!socket) {
                return false;
            }

            if (!socket->is_open()) {
                return false;
            }

            /* Sending data to the extranet. */
            boost::system::error_code ec;
            socket->send_to(boost::asio::buffer((char*)buffer + offset, length), destinationEP, 0, ec);
            return ec ? false : true;
        }

        void Xtcp::CloseNetStack() noexcept {
            ikcpcb* kcp = (ikcpcb*)kcp_.exchange(NULL);
            if (kcp) {
                ikcp_release(kcp);
            }
        }

        void Xtcp::CloseSocket() noexcept {
            boost::asio::ip::udp::socket* socket = socket_.exchange(NULL);
            if (socket) {
                frp::net::Socket::Closesocket(*socket);
            }
        }

        void Xtcp::CancelTimeout() noexcept {
            frp::threading::Hosting::Cancel(timeout_);
        }

        void Xtcp::ClearAllMessages() noexcept {
            std::vector<pmessage> messages;
            message_queue::iterator tail = messages_.begin();
            message_queue::iterator endl = messages_.end();
            for (; tail != endl; tail++) {
                messages.push_back(std::move(*tail));
            }

            messages_.clear();
            for (std::size_t index = 0, length = messages.size(); index < length; index++) {
                pmessage message = std::move(messages[index]);
                if (!message) {
                    continue;
                }

                WriteAsyncCallback& handler = message->callback;
                if (handler) {
                    handler(false);
                }
            }
        }

        void Xtcp::CancelReadAsync() noexcept {
            ReadAsyncCallback handler = std::move(readACB_.callback);
            readACB_.buffer = NULL;
            readACB_.length = 0;
            readACB_.callback = NULL;

            if (handler) {
                handler(-1);
            }
        }

        void Xtcp::TryFlushNetStack(void* state) noexcept {
            /* Check whether the peer address is valid. */
            boost::asio::ip::address destinationAddress = remoteEP_.address();
            if (destinationAddress.is_multicast() || destinationAddress.is_unspecified()) {
                return;
            }

            /* Check whether the peer port is valid. */
            int destinationPort = remoteEP_.port();
            if (destinationPort <= IPEndPoint::MinPort || destinationPort > IPEndPoint::MaxPort) {
                return;
            }

            /* The loop attempts to send segmented data to the peer. */
            ikcpcb* kcp = (ikcpcb*)state;
            while (kcp) { /* FIFO */
                message_queue::iterator tail = messages_.begin();
                message_queue::iterator endl = messages_.end();
                if (tail == endl) {
                    break;
                }

                pmessage& message = *tail;
                if (message) {
                    int hr = ikcp_send(kcp, (char*)message->packet.get(), message->packet_size);
                    if (hr < 0) {
                        break;
                    }

                    WriteAsyncCallback& handler = message->callback;
                    if (handler) {
                        handler(true);
                    }
                }

                messages_.erase(tail);
            }
        }

        bool Xtcp::TryReadNetStack(void* state) noexcept {
            /* The state pointer must be a handle to KCP. */
            ikcpcb* kcp = (ikcpcb*)state;
            if (!kcp) {
                return false;
            }

            /* Check whether an asynchronous retrieval task exists. */
            char* buffer = readACB_.buffer;
            if (!buffer) {
                return false;
            }

            /* Attempts to receive data from the protocol stack cache. */
            int len = ikcp_recv(kcp, buffer, readACB_.length);
            if (len < 0) {
                return true;
            }

            /* Reset the Read Async Control Block. */
            ReadAsyncCallback handler = std::move(readACB_.callback);
            readACB_.buffer = NULL;
            readACB_.length = 0;
            readACB_.callback = NULL;

            /* Callback this asynchronous data acquisition. */
            if (handler) {
                handler(len);
            }
            return true;
        }

        bool Xtcp::Input(const void* buffer, int offset, int length, const boost::asio::ip::udp::endpoint& remoteEP) noexcept {
            if (!buffer || offset < 0 || length < 1) {
                return false;
            }

            ikcpcb* kcp = (ikcpcb*)kcp_.load();
            if (!kcp) {
                return false;
            }

            int hr = ikcp_input(kcp, (char*)buffer + offset, length);
            if (hr < 0) {
                return false;
            }

            /* UDP/IP port mapped by the carrier's network router to the user's dynamic NAT may change, but it does not change for the local broadband user's port. */
            /* So people need to make sure that every time a read Input succeeds they should try to overwrite the original value. */
            SetRemoteEndPoint(remoteEP);

            /* An attempt is made to send segmented data accumulated in the send queue to the peer end. */
            TryReadNetStack(kcp);
            TryFlushNetStack(kcp);
            return true;
        }

        bool Xtcp::IsAvailable() noexcept {
            boost::asio::ip::udp::socket* socket = socket_.load();
            if (!socket) {
                return false;
            }

            if (!socket->is_open()) {
                return false;
            }

            if (disposed_) {
                return false;
            }

            if (!open_) {
                return false;
            }

            return NULL != kcp_.load();
        }

        boost::asio::ip::udp::endpoint Xtcp::GetLocalEndPoint() noexcept {
            boost::asio::ip::udp::socket* socket = socket_.load();
            if (socket) {
                if (socket->is_open()) {
                    boost::system::error_code ec;
                    boost::asio::ip::udp::endpoint localEP = socket->local_endpoint(ec);
                    if (!ec) {
                        return localEP;
                    }
                }
            }
            return IPEndPoint::ToEndPoint<boost::asio::ip::udp>(IPEndPoint());
        }

        Xtcp& Xtcp::SetRemoteEndPoint(const boost::asio::ip::udp::endpoint& remoteEP) noexcept {
            remoteEP_ = IPEndPoint::Transform(af_, remoteEP);
            return *this;
        }

        bool Xtcp::ReadAsync(const void* buffer, int offset, int length, const ReadAsyncCallback& callback) noexcept {
            if (!buffer || offset < 0 || length < 1 || !callback) {
                return false;
            }

            if (!IsAvailable()) {
                return false;
            }

            /* Multiple pending asynchronous reads are not allowed at the same time. */
            if (readACB_.buffer) {
                return false;
            }

            /* Set the control block of the asynchronous read task. */
            readACB_.buffer = (char*)buffer + offset;
            readACB_.length = length;
            readACB_.callback = callback;

            return TryReadNetStack(kcp_.load());
        }

        bool Xtcp::WriteAsync(const void* buffer, int offset, int length, const WriteAsyncCallback& callback) noexcept {
            if (!buffer || offset < 0 || length < 1) {
                return false;
            }

            ikcpcb* kcp = (ikcpcb*)kcp_.load();
            if (!IsAvailable()) {
                return false;
            }

            std::shared_ptr<Byte> messages = make_shared_alloc<Byte>(length);
            memcpy(messages.get(), ((Byte*)buffer) + offset, length);

            /* Insert the flow controlled data segment into the send queues. */
            pmessage message_ = make_shared_object<message>();
            message_->packet = messages;
            message_->packet_size = length;
            message_->callback = callback;
            messages_.push_back(message_);

            /* Attempt to flush all message accumulated in the message queues. */
            TryReadNetStack(kcp);
            TryFlushNetStack(kcp);
            return true;
        }

        XtcpClient::XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, const std::shared_ptr<boost::asio::ip::udp::socket>& socket, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept
            : Xtcp(*socket, channel, remoteEP)
            , hosting_(hosting)
            , context_(context)
            , socket_(socket) {
            buffer_ = hosting->GetBuffer();
        }

        XtcpClient::XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept
            : XtcpClient(hosting, context, make_shared_object<boost::asio::ip::udp::socket>(*context), channel, remoteEP) {

        }

        XtcpClient::XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const boost::asio::ip::udp::endpoint& localEP, const boost::asio::ip::udp::endpoint& remoteEP) noexcept
            : XtcpClient(hosting, context, CreateSocket(context, localEP), channel, remoteEP) {

        }

        XtcpClient::XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const std::string& host, int port) noexcept
            : XtcpClient(hosting, context, channel, ToEndPoint(host, port)) {

        }

        std::shared_ptr<boost::asio::io_context> XtcpClient::GetContext() noexcept {
            return context_;
        }

        std::shared_ptr<boost::asio::ip::udp::socket> XtcpClient::GetSocket() noexcept {
            return socket_;
        }

        std::shared_ptr<frp::threading::Hosting> XtcpClient::GetHosting() noexcept {
            return hosting_;
        }

        std::shared_ptr<Byte> XtcpClient::GetBuffer(int& length) noexcept {
            length = frp::threading::Hosting::BufferSize;
            return buffer_;
        }

        boost::asio::ip::udp::endpoint XtcpClient::ToEndPoint(const std::string host, int port) noexcept {
            return IPEndPoint::ToEndPoint<boost::asio::ip::udp>(Ipep::GetEndPoint(host, port));
        }

        std::shared_ptr<boost::asio::ip::udp::socket> XtcpClient::CreateSocket(const std::shared_ptr<boost::asio::io_context>& context, const boost::asio::ip::udp::endpoint& localEP) noexcept {
            if (!context) {
                return NULL;
            }

            std::shared_ptr<boost::asio::ip::udp::socket> socket = make_shared_object<boost::asio::ip::udp::socket>(*context);
            if (socket) {
                boost::asio::ip::address address = localEP.address();
                if (address.is_multicast()) {
                    address = boost::asio::ip::address_v6::any();
                }

                if (!frp::net::Socket::OpenSocket(*socket, address, localEP.port())) {
                    return NULL;
                }
            }
            return socket;
        }
    }
}

 类似资料: