Xtcp.h
#pragma once
#include <frp/IDisposable.h>
#include <frp/net/IPEndPoint.h>
#include <frp/threading/Hosting.h>
namespace frp {
namespace xtcp {
class Xtcp : public IDisposable {
public:
typedef std::function<void(int)> ReadAsyncCallback;
typedef std::function<void(bool)> WriteAsyncCallback;
private:
struct message {
std::shared_ptr<Byte> packet;
int packet_size;
WriteAsyncCallback callback;
};
typedef std::shared_ptr<message> pmessage;
typedef std::list<pmessage> message_queue;
public:
Xtcp(boost::asio::ip::udp::socket& socket, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
~Xtcp() noexcept;
public:
virtual bool ReadAsync(const void* buffer, int offset, int length, const ReadAsyncCallback& callback) noexcept;
virtual bool WriteAsync(const void* buffer, int offset, int length, const WriteAsyncCallback& callback) noexcept;
virtual void Dispose() noexcept override;
virtual void Update(UInt64 now) noexcept;
virtual UInt64 CurrentMillisec() noexcept;
public:
virtual bool IsAvailable() noexcept;
inline frp::net::AddressFamily GetAddressFamily() noexcept { return af_; }
inline int GetChannel() noexcept { return channel_; }
inline boost::asio::ip::udp::socket* GetSocket() noexcept { return socket_.load(); }
inline boost::asio::ip::udp::endpoint& GetRemoteEndPoint() noexcept { return remoteEP_; }
Xtcp& SetRemoteEndPoint(const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
boost::asio::ip::udp::endpoint GetLocalEndPoint() noexcept;
public:
virtual bool Listen() noexcept;
virtual bool Open() noexcept;
void Close() noexcept;
bool NoDelay(bool value) noexcept;
int GetMinRetransmissionTimeout() noexcept;
bool SetMinRetransmissionTimeout(int timeout) noexcept;
public:
virtual std::shared_ptr<Byte> GetBuffer(int& length) = 0;
virtual bool Input(const void* buffer, int offset, int length, const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
virtual bool Output(const void* buffer, int offset, int length, const boost::asio::ip::udp::endpoint& destinationEP) noexcept;
private:
bool Timeout() noexcept;
bool TryReadNetStack(void* state) noexcept;
void TryFlushNetStack(void* state) noexcept;
void Finalize() noexcept;
void CloseNetStack() noexcept;
void CloseSocket() noexcept;
void CancelTimeout() noexcept;
void CancelReadAsync() noexcept;
void ClearAllMessages() noexcept;
private:
std::atomic<bool> disposed_;
std::atomic<bool> open_;
std::atomic<bool> listen_;
std::atomic<void*> kcp_;
std::atomic<boost::asio::ip::udp::socket*> socket_;
frp::net::AddressFamily af_;
int channel_;
boost::asio::ip::udp::endpoint sourceEP_;
boost::asio::ip::udp::endpoint remoteEP_;
boost::asio::deadline_timer timeout_;
message_queue messages_;
struct {
char* buffer;
int length;
ReadAsyncCallback callback;
} readACB_;
};
class XtcpClient : public Xtcp {
public:
XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, const std::shared_ptr<boost::asio::ip::udp::socket>& socket, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const boost::asio::ip::udp::endpoint& localEP, const boost::asio::ip::udp::endpoint& remoteEP) noexcept;
XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const std::string& host, int port) noexcept;
public:
std::shared_ptr<boost::asio::ip::udp::socket> GetSocket() noexcept;
std::shared_ptr<boost::asio::io_context> GetContext() noexcept;
std::shared_ptr<frp::threading::Hosting> GetHosting() noexcept;
virtual std::shared_ptr<Byte> GetBuffer(int& length) noexcept override;
public:
static boost::asio::ip::udp::endpoint ToEndPoint(const std::string host, int port) noexcept;
static std::shared_ptr<boost::asio::ip::udp::socket> CreateSocket(const std::shared_ptr<boost::asio::io_context>& context, const boost::asio::ip::udp::endpoint& localEP) noexcept;
private:
std::shared_ptr<frp::threading::Hosting> hosting_;
std::shared_ptr<boost::asio::io_context> context_;
std::shared_ptr<boost::asio::ip::udp::socket> socket_;
std::shared_ptr<Byte> buffer_;
};
}
}
Xtcp.cpp
#include <frp/xtcp/Xtcp.h>
#include <frp/xtcp/ikcp.h>
#include <frp/net/Ipep.h>
#include <frp/net/Socket.h>
using frp::net::AddressFamily;
using frp::net::Ipep;
using frp::net::IPEndPoint;
namespace frp {
namespace xtcp {
struct IKcpAllocator {
inline IKcpAllocator() noexcept {
ikcp_allocator(
[](size_t sz) noexcept {
return Malloc(sz);
},
[](void* p) noexcept {
Mfree(p);
});
}
};
/* Set the default memory allocator for Xtcp globally. */
static IKcpAllocator __XtcpAllocator__;
Xtcp::Xtcp(boost::asio::ip::udp::socket& socket, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept
: disposed_(false)
, open_(false)
, listen_(false)
, kcp_(NULL)
, channel_(channel)
, socket_(addressof(socket))
, timeout_(socket.get_executor()) {
/* Initialize the Read Async Control Block. */
readACB_.buffer = NULL;
readACB_.length = 0;
/* If the socket is not opened, try opening the socket. */
if (!socket.is_open()) {
boost::asio::ip::address address = remoteEP.address(); /* Destination endpoint was incorrect. */
if (address.is_loopback()) {
address = boost::asio::ip::address_v6::loopback();
}
else {
address = boost::asio::ip::address_v6::any();
}
frp::net::Socket::OpenSocket(socket, address, IPEndPoint::MinPort);
}
/* Create a KCP-ARQ congestion control network tunnels. */
ikcpcb* kcp = ikcp_create(channel, this);
if (kcp) {
/* Sets the function properties of the KCP protocol stack. */
ikcp_setmtu(kcp, 1400);
ikcp_wndsize(kcp, 128, 128);
/* Sets the field properties of the KCP protocol stack. */
ikcp_setoutput(kcp,
[](const char* buf, int len, ikcpcb* kcp, void* user) noexcept {
Xtcp* sender = (Xtcp*)user;
return sender->Output(buf, 0, len, sender->GetRemoteEndPoint()) ? -1 : 0;
});
}
kcp_.exchange(kcp);
/* Nagle algorithm is turned on by default. */
NoDelay(false);
/* Determines the IP protocol cluster type of the current socket. */
boost::system::error_code ec;
af_ = AddressFamily::InterNetwork;
boost::asio::ip::udp::endpoint localEP = socket.local_endpoint(ec);
if (!ec) {
boost::asio::ip::address localIP = localEP.address();
if (localIP.is_v6()) {
af_ = AddressFamily::InterNetworkV6;
}
}
/* Marks the current remote endpoint address. */
SetRemoteEndPoint(remoteEP);
}
Xtcp::~Xtcp() noexcept {
Finalize();
}
void Xtcp::Finalize() noexcept {
CloseNetStack();
CloseSocket();
CancelTimeout();
CancelReadAsync();
ClearAllMessages();
}
void Xtcp::Close() noexcept {
Dispose();
}
void Xtcp::Dispose() noexcept {
if (!disposed_.exchange(true)) {
Finalize();
}
}
bool Xtcp::Open() noexcept {
boost::asio::ip::udp::socket* socket = socket_.load();
if (!socket) {
return false;
}
if (!socket->is_open()) {
return false;
}
if (disposed_) {
return false;
}
if (open_.exchange(true)) {
return false;
}
return Timeout();
}
bool Xtcp::Listen() noexcept {
boost::asio::ip::udp::socket* socket = socket_.load();
if (!socket) {
return false;
}
if (!socket->is_open()) {
return false;
}
if (disposed_) {
return false;
}
if (!open_) {
return false;
}
if (listen_.exchange(true)) {
return false;
}
int buffer_size = 0;
std::shared_ptr<Byte> buffer = GetBuffer(buffer_size);
std::shared_ptr<Reference> reference = GetReference();
socket->async_receive_from(boost::asio::buffer(buffer.get(), buffer_size), sourceEP_,
[reference, this, buffer](const boost::system::error_code& ec, std::size_t sz) noexcept {
if (listen_.exchange(false)) {
int len = std::max<int>(ec ? -1 : sz, -1);
if (len > 0) {
Input(buffer.get(), 0, len, sourceEP_);
}
Listen();
}
});
return true;
}
bool Xtcp::Timeout() noexcept {
if (!IsAvailable()) {
return false;
}
std::shared_ptr<Reference> reference = GetReference();
timeout_.expires_from_now(boost::posix_time::milliseconds(1));
timeout_.async_wait(
[reference, this](const boost::system::error_code& ec) noexcept {
if (!ec) {
Update(CurrentMillisec());
Timeout();
}
});
return true;
}
bool Xtcp::NoDelay(bool value) noexcept {
ikcpcb* kcp = (ikcpcb*)kcp_.load();
if (!kcp) {
return false;
}
int nodelay = value ? 2 : 0;
int hr = ikcp_nodelay(kcp, nodelay, 10, 2, 1);
if (hr < 0) {
return false;
}
return true;
}
int Xtcp::GetMinRetransmissionTimeout() noexcept {
ikcpcb* kcp = (ikcpcb*)kcp_.load();
if (!kcp) {
return -1;
}
return kcp->rx_minrto;
}
bool Xtcp::SetMinRetransmissionTimeout(int timeout) noexcept {
if (timeout < 10) {
timeout = 10;
}
ikcpcb* kcp = (ikcpcb*)kcp_.load();
if (!kcp) {
return false;
}
kcp->rx_minrto = timeout;
return true;
}
UInt64 Xtcp::CurrentMillisec() noexcept {
return GetTickCount();
}
void Xtcp::Update(UInt64 now) noexcept {
ikcpcb* kcp = (ikcpcb*)kcp_.load();
if (kcp) {
TryReadNetStack(kcp);
TryFlushNetStack(kcp);
ikcp_update(kcp, now & 0xfffffffful);
}
}
bool Xtcp::Output(const void* buffer, int offset, int length, const boost::asio::ip::udp::endpoint& destinationEP) noexcept {
/* Check the validity of the input parameters. */
if (!buffer || offset < 0 || length < 1) {
return false;
}
/* Atom gets the pointer address of the socket and ret out if null. */
boost::asio::ip::udp::socket* socket = socket_.load();
if (!socket) {
return false;
}
if (!socket->is_open()) {
return false;
}
/* Sending data to the extranet. */
boost::system::error_code ec;
socket->send_to(boost::asio::buffer((char*)buffer + offset, length), destinationEP, 0, ec);
return ec ? false : true;
}
void Xtcp::CloseNetStack() noexcept {
ikcpcb* kcp = (ikcpcb*)kcp_.exchange(NULL);
if (kcp) {
ikcp_release(kcp);
}
}
void Xtcp::CloseSocket() noexcept {
boost::asio::ip::udp::socket* socket = socket_.exchange(NULL);
if (socket) {
frp::net::Socket::Closesocket(*socket);
}
}
void Xtcp::CancelTimeout() noexcept {
frp::threading::Hosting::Cancel(timeout_);
}
void Xtcp::ClearAllMessages() noexcept {
std::vector<pmessage> messages;
message_queue::iterator tail = messages_.begin();
message_queue::iterator endl = messages_.end();
for (; tail != endl; tail++) {
messages.push_back(std::move(*tail));
}
messages_.clear();
for (std::size_t index = 0, length = messages.size(); index < length; index++) {
pmessage message = std::move(messages[index]);
if (!message) {
continue;
}
WriteAsyncCallback& handler = message->callback;
if (handler) {
handler(false);
}
}
}
void Xtcp::CancelReadAsync() noexcept {
ReadAsyncCallback handler = std::move(readACB_.callback);
readACB_.buffer = NULL;
readACB_.length = 0;
readACB_.callback = NULL;
if (handler) {
handler(-1);
}
}
void Xtcp::TryFlushNetStack(void* state) noexcept {
/* Check whether the peer address is valid. */
boost::asio::ip::address destinationAddress = remoteEP_.address();
if (destinationAddress.is_multicast() || destinationAddress.is_unspecified()) {
return;
}
/* Check whether the peer port is valid. */
int destinationPort = remoteEP_.port();
if (destinationPort <= IPEndPoint::MinPort || destinationPort > IPEndPoint::MaxPort) {
return;
}
/* The loop attempts to send segmented data to the peer. */
ikcpcb* kcp = (ikcpcb*)state;
while (kcp) { /* FIFO */
message_queue::iterator tail = messages_.begin();
message_queue::iterator endl = messages_.end();
if (tail == endl) {
break;
}
pmessage& message = *tail;
if (message) {
int hr = ikcp_send(kcp, (char*)message->packet.get(), message->packet_size);
if (hr < 0) {
break;
}
WriteAsyncCallback& handler = message->callback;
if (handler) {
handler(true);
}
}
messages_.erase(tail);
}
}
bool Xtcp::TryReadNetStack(void* state) noexcept {
/* The state pointer must be a handle to KCP. */
ikcpcb* kcp = (ikcpcb*)state;
if (!kcp) {
return false;
}
/* Check whether an asynchronous retrieval task exists. */
char* buffer = readACB_.buffer;
if (!buffer) {
return false;
}
/* Attempts to receive data from the protocol stack cache. */
int len = ikcp_recv(kcp, buffer, readACB_.length);
if (len < 0) {
return true;
}
/* Reset the Read Async Control Block. */
ReadAsyncCallback handler = std::move(readACB_.callback);
readACB_.buffer = NULL;
readACB_.length = 0;
readACB_.callback = NULL;
/* Callback this asynchronous data acquisition. */
if (handler) {
handler(len);
}
return true;
}
bool Xtcp::Input(const void* buffer, int offset, int length, const boost::asio::ip::udp::endpoint& remoteEP) noexcept {
if (!buffer || offset < 0 || length < 1) {
return false;
}
ikcpcb* kcp = (ikcpcb*)kcp_.load();
if (!kcp) {
return false;
}
int hr = ikcp_input(kcp, (char*)buffer + offset, length);
if (hr < 0) {
return false;
}
/* UDP/IP port mapped by the carrier's network router to the user's dynamic NAT may change, but it does not change for the local broadband user's port. */
/* So people need to make sure that every time a read Input succeeds they should try to overwrite the original value. */
SetRemoteEndPoint(remoteEP);
/* An attempt is made to send segmented data accumulated in the send queue to the peer end. */
TryReadNetStack(kcp);
TryFlushNetStack(kcp);
return true;
}
bool Xtcp::IsAvailable() noexcept {
boost::asio::ip::udp::socket* socket = socket_.load();
if (!socket) {
return false;
}
if (!socket->is_open()) {
return false;
}
if (disposed_) {
return false;
}
if (!open_) {
return false;
}
return NULL != kcp_.load();
}
boost::asio::ip::udp::endpoint Xtcp::GetLocalEndPoint() noexcept {
boost::asio::ip::udp::socket* socket = socket_.load();
if (socket) {
if (socket->is_open()) {
boost::system::error_code ec;
boost::asio::ip::udp::endpoint localEP = socket->local_endpoint(ec);
if (!ec) {
return localEP;
}
}
}
return IPEndPoint::ToEndPoint<boost::asio::ip::udp>(IPEndPoint());
}
Xtcp& Xtcp::SetRemoteEndPoint(const boost::asio::ip::udp::endpoint& remoteEP) noexcept {
remoteEP_ = IPEndPoint::Transform(af_, remoteEP);
return *this;
}
bool Xtcp::ReadAsync(const void* buffer, int offset, int length, const ReadAsyncCallback& callback) noexcept {
if (!buffer || offset < 0 || length < 1 || !callback) {
return false;
}
if (!IsAvailable()) {
return false;
}
/* Multiple pending asynchronous reads are not allowed at the same time. */
if (readACB_.buffer) {
return false;
}
/* Set the control block of the asynchronous read task. */
readACB_.buffer = (char*)buffer + offset;
readACB_.length = length;
readACB_.callback = callback;
return TryReadNetStack(kcp_.load());
}
bool Xtcp::WriteAsync(const void* buffer, int offset, int length, const WriteAsyncCallback& callback) noexcept {
if (!buffer || offset < 0 || length < 1) {
return false;
}
ikcpcb* kcp = (ikcpcb*)kcp_.load();
if (!IsAvailable()) {
return false;
}
std::shared_ptr<Byte> messages = make_shared_alloc<Byte>(length);
memcpy(messages.get(), ((Byte*)buffer) + offset, length);
/* Insert the flow controlled data segment into the send queues. */
pmessage message_ = make_shared_object<message>();
message_->packet = messages;
message_->packet_size = length;
message_->callback = callback;
messages_.push_back(message_);
/* Attempt to flush all message accumulated in the message queues. */
TryReadNetStack(kcp);
TryFlushNetStack(kcp);
return true;
}
XtcpClient::XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, const std::shared_ptr<boost::asio::ip::udp::socket>& socket, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept
: Xtcp(*socket, channel, remoteEP)
, hosting_(hosting)
, context_(context)
, socket_(socket) {
buffer_ = hosting->GetBuffer();
}
XtcpClient::XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const boost::asio::ip::udp::endpoint& remoteEP) noexcept
: XtcpClient(hosting, context, make_shared_object<boost::asio::ip::udp::socket>(*context), channel, remoteEP) {
}
XtcpClient::XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const boost::asio::ip::udp::endpoint& localEP, const boost::asio::ip::udp::endpoint& remoteEP) noexcept
: XtcpClient(hosting, context, CreateSocket(context, localEP), channel, remoteEP) {
}
XtcpClient::XtcpClient(const std::shared_ptr<frp::threading::Hosting>& hosting, const std::shared_ptr<boost::asio::io_context>& context, int channel, const std::string& host, int port) noexcept
: XtcpClient(hosting, context, channel, ToEndPoint(host, port)) {
}
std::shared_ptr<boost::asio::io_context> XtcpClient::GetContext() noexcept {
return context_;
}
std::shared_ptr<boost::asio::ip::udp::socket> XtcpClient::GetSocket() noexcept {
return socket_;
}
std::shared_ptr<frp::threading::Hosting> XtcpClient::GetHosting() noexcept {
return hosting_;
}
std::shared_ptr<Byte> XtcpClient::GetBuffer(int& length) noexcept {
length = frp::threading::Hosting::BufferSize;
return buffer_;
}
boost::asio::ip::udp::endpoint XtcpClient::ToEndPoint(const std::string host, int port) noexcept {
return IPEndPoint::ToEndPoint<boost::asio::ip::udp>(Ipep::GetEndPoint(host, port));
}
std::shared_ptr<boost::asio::ip::udp::socket> XtcpClient::CreateSocket(const std::shared_ptr<boost::asio::io_context>& context, const boost::asio::ip::udp::endpoint& localEP) noexcept {
if (!context) {
return NULL;
}
std::shared_ptr<boost::asio::ip::udp::socket> socket = make_shared_object<boost::asio::ip::udp::socket>(*context);
if (socket) {
boost::asio::ip::address address = localEP.address();
if (address.is_multicast()) {
address = boost::asio::ip::address_v6::any();
}
if (!frp::net::Socket::OpenSocket(*socket, address, localEP.port())) {
return NULL;
}
}
return socket;
}
}
}