From a5b1f2d9760f284fc9e7ad3c26cbba59828fafbc Mon Sep 17 00:00:00 2001 From: csoler Date: Sun, 14 Nov 2021 23:31:40 +0100 Subject: [PATCH] created independent tcpsocket class to be used also in TorManager --- .../friend_server/{fsbio.cc => socketbio.cc} | 139 ++++++++++++++++-- .../friend_server/{fsbio.h => socketbio.h} | 19 ++- libretroshare/src/friend_server/tcpsocket.cc | 75 ++++++++++ libretroshare/src/friend_server/tcpsocket.h | 41 ++++++ libretroshare/src/libretroshare.pro | 10 +- 5 files changed, 263 insertions(+), 21 deletions(-) rename libretroshare/src/friend_server/{fsbio.cc => socketbio.cc} (61%) rename libretroshare/src/friend_server/{fsbio.h => socketbio.h} (82%) create mode 100644 libretroshare/src/friend_server/tcpsocket.cc create mode 100644 libretroshare/src/friend_server/tcpsocket.h diff --git a/libretroshare/src/friend_server/fsbio.cc b/libretroshare/src/friend_server/socketbio.cc similarity index 61% rename from libretroshare/src/friend_server/fsbio.cc rename to libretroshare/src/friend_server/socketbio.cc index 313e073d7..9caa45d56 100644 --- a/libretroshare/src/friend_server/fsbio.cc +++ b/libretroshare/src/friend_server/socketbio.cc @@ -24,16 +24,43 @@ #include "fsbio.h" FsBioInterface::FsBioInterface(int socket) - : mCLintConnt(socket),mIsActive(true) + : mCLintConnt(socket),mIsActive(socket!=0) { mTotalReadBytes=0; - mTotalBufferBytes=0; + mTotalInBufferBytes=0; + mTotalWrittenBytes=0; + mTotalOutBufferBytes=0; } +void FsBioInterface::setSocket(int s) +{ + if(mIsActive != 0) + { + RsErr() << "Changing socket to active FsBioInterface! Canceling all pending R/W data." ; + close(); + } + mCLintConnt = s; + mIsActive = (s!=0); +} int FsBioInterface::tick() { + if(!mIsActive) + { + RsErr() << "Ticking a non active FsBioInterface!" ; + return 0; + } // 2 - read incoming data pending on existing connections + int res=0; + + res += read_pending(); + res += write_pending(); + + return res; +} + +int FsBioInterface::read_pending() +{ char inBuffer[1025]; memset(inBuffer,0,1025); @@ -45,14 +72,14 @@ int FsBioInterface::tick() RsDbg() << "Closing!" ; close(); - return mTotalBufferBytes; + return mTotalInBufferBytes; } if(readbytes < 0) { if(errno != EWOULDBLOCK && errno != EAGAIN) RsErr() << "read() failed. Errno=" << errno ; - return mTotalBufferBytes; + return mTotalInBufferBytes; } RsDbg() << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes ; @@ -72,15 +99,80 @@ int FsBioInterface::tick() memcpy(ptr,inBuffer,readbytes); in_buffer.push_back(std::make_pair(ptr,readbytes)); - mTotalBufferBytes += readbytes; + mTotalInBufferBytes += readbytes; mTotalReadBytes += readbytes; - RsDbg() << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalBufferBytes ; + RsDbg() << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalInBufferBytes ; } - - return mTotalBufferBytes; + return mTotalInBufferBytes; } +int FsBioInterface::write_pending() +{ + if(out_buffer.empty()) + return mTotalOutBufferBytes; + + auto& p = out_buffer.front(); + int written = write(mCLintConnt, p.first, p.second); + + if(written < 0) + { + if(errno != EWOULDBLOCK && errno != EAGAIN) + RsErr() << "write() failed. Errno=" << errno ; + + return mTotalOutBufferBytes; + } + + if(written == 0) + { + RsErr() << "write() failed. Nothing sent."; + return mTotalOutBufferBytes; + } + + RsDbg() << "clintConnt: " << mCLintConnt << ", written: " << written ; + + // display some debug info + + RsDbg() << "Sent the following bytes: " << RsUtil::BinToHex( reinterpret_cast(p.first),written,50) << std::endl; + + if(written < p.second) + { + void *ptr = malloc(p.second - written); + + if(!ptr) + throw std::runtime_error("Cannot allocate memory! Go buy some RAM!"); + + memcpy(ptr,static_cast(p.first) + written,p.second - written); + free(p.first); + + out_buffer.front().first = ptr; + out_buffer.front().second = p.second - written; + } + else + { + free(p.first); + out_buffer.pop_front(); + } + + mTotalOutBufferBytes -= written; + mTotalWrittenBytes += written; + + return mTotalOutBufferBytes; +} + +FsBioInterface::~FsBioInterface() +{ + clean(); +} + +void FsBioInterface::clean() +{ + for(auto p:in_buffer) free(p.first); + for(auto p:out_buffer) free(p.first); + + in_buffer.clear(); + out_buffer.clear(); +} int FsBioInterface::readdata(void *data, int len) { // read incoming bytes in the buffer @@ -91,7 +183,7 @@ int FsBioInterface::readdata(void *data, int len) { if(in_buffer.empty()) { - mTotalBufferBytes -= total_len; + mTotalInBufferBytes -= total_len; return total_len; } @@ -108,7 +200,7 @@ int FsBioInterface::readdata(void *data, int len) in_buffer.front().first = ptr; in_buffer.front().second -= len-total_len; - mTotalBufferBytes -= len; + mTotalInBufferBytes -= len; return len; } else // copy everything @@ -121,7 +213,7 @@ int FsBioInterface::readdata(void *data, int len) in_buffer.pop_front(); } } - mTotalBufferBytes -= len; + mTotalInBufferBytes -= len; return len; } @@ -129,12 +221,24 @@ int FsBioInterface::senddata(void *data, int len) { // shouldn't we better send in multiple packets, similarly to how we read? - RsDbg() << "FsBioInterface: sending data packet of size " << len ; + if(len == 0) + { + RsErr() << "Calling FsBioInterface::senddata() with null size or null data pointer"; + return 0; + } + void *ptr = malloc(len); - int written = write(mCLintConnt, data, len); - RsDbg() << "FsBioInterface: done."; + if(!ptr) + { + RsErr() << "Cannot allocate data of size " << len ; + return 0; + } - return written; + memcpy(ptr,data,len); + out_buffer.push_back(std::make_pair(ptr,len)); + + mTotalOutBufferBytes += len; + return len; } int FsBioInterface::netstatus() { @@ -148,7 +252,7 @@ int FsBioInterface::isactive() bool FsBioInterface::moretoread(uint32_t /* usec */) { - return mTotalBufferBytes > 0; + return mTotalInBufferBytes > 0; } bool FsBioInterface::cansend(uint32_t) { @@ -159,6 +263,9 @@ int FsBioInterface::close() { RsDbg() << "Stopping network interface" << std::endl; mIsActive = false; + mCLintConnt = 0; + clean(); + return 1; } diff --git a/libretroshare/src/friend_server/fsbio.h b/libretroshare/src/friend_server/socketbio.h similarity index 82% rename from libretroshare/src/friend_server/fsbio.h rename to libretroshare/src/friend_server/socketbio.h index 6b2ed06bd..697719d74 100644 --- a/libretroshare/src/friend_server/fsbio.h +++ b/libretroshare/src/friend_server/socketbio.h @@ -26,12 +26,19 @@ class FsBioInterface: public BinInterface { public: FsBioInterface(int socket); + ~FsBioInterface(); // Implements BinInterface methods int tick() override; + // Schedule data to be sent at the next tick(). The caller keeps memory ownership. + // int senddata(void *data, int len) override; + + // Obtains new data from the interface. "data" needs to be initialized for room + // to len bytes. The returned value is the actual size of what was read. + // int readdata(void *data, int len) override; int netstatus() override; @@ -49,12 +56,22 @@ public: bool bandwidthLimited() override { return false; } +protected: + void setSocket(int s); + void clean(); + private: + int read_pending(); + int write_pending(); + int mCLintConnt; bool mIsActive; uint32_t mTotalReadBytes; - uint32_t mTotalBufferBytes; + uint32_t mTotalInBufferBytes; + uint32_t mTotalWrittenBytes; + uint32_t mTotalOutBufferBytes; std::list > in_buffer; + std::list > out_buffer; }; diff --git a/libretroshare/src/friend_server/tcpsocket.cc b/libretroshare/src/friend_server/tcpsocket.cc new file mode 100644 index 000000000..19b69d1b2 --- /dev/null +++ b/libretroshare/src/friend_server/tcpsocket.cc @@ -0,0 +1,75 @@ +#include +#include +#include +#include +#include + +#include "tcpsocket.h" + +TcpSocket::TcpSocket(const std::string& tcp_address,uint16_t tcp_port) + :FsBioInterface(0),mState(DISCONNECTED),mConnectAddress(tcp_address),mConnectPort(tcp_port),mSocket(0) +{ +} +int TcpSocket::connect() +{ + int CreateSocket = 0; + char dataReceived[1024]; + struct sockaddr_in ipOfServer; + + memset(dataReceived, '0' ,sizeof(dataReceived)); + + if((CreateSocket = socket(AF_INET, SOCK_STREAM, 0))< 0) + { + printf("Socket not created \n"); + return false; + } + + ipOfServer.sin_family = AF_INET; + ipOfServer.sin_port = htons(mConnectPort); + ipOfServer.sin_addr.s_addr = inet_addr(mConnectAddress.c_str()); + + if(::connect(mSocket, (struct sockaddr *)&ipOfServer, sizeof(ipOfServer))<0) + { + printf("Connection failed due to port and ip problems, or server is not available\n"); + return false; + } + mState = CONNECTED; + setSocket(mSocket); + + return true; +} + +int TcpSocket::close() +{ + FsBioInterface::close(); + + return !::close(mSocket); +} + +ThreadedTcpSocket::ThreadedTcpSocket(const std::string& tcp_address,uint16_t tcp_port) + : TcpSocket(tcp_address,tcp_port) +{ +} + +void ThreadedTcpSocket::run() +{ + if(!connect()) + { + RsErr() << "Cannot connect socket to " << connectAddress() << ":" << connectPort() ; + return ; + } + + while(connectionState() == CONNECTED) + { + tick(); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + RsWarn() << "Connection to " << connectAddress() << ":" << connectPort() << " is now closed."; +} + +ThreadedTcpSocket::~ThreadedTcpSocket() +{ + fullstop(); // fully wait for stopping. + + close(); +} diff --git a/libretroshare/src/friend_server/tcpsocket.h b/libretroshare/src/friend_server/tcpsocket.h new file mode 100644 index 000000000..f35cefb71 --- /dev/null +++ b/libretroshare/src/friend_server/tcpsocket.h @@ -0,0 +1,41 @@ +#include +#include "util/rsthreads.h" +#include "friend_server/fsbio.h" + +class TcpSocket: public FsBioInterface +{ +public: + TcpSocket(const std::string& tcp_address,uint16_t tcp_port); + + enum State: uint8_t { + UNKNOWN = 0x00, + DISCONNECTED = 0x01, + CONNECTED = 0x02 + }; + + // Return 1 when OK, 0 otherwise. + int connect(); + + // Returns 1 when OK, 0 otherwise. + int close(); + + State connectionState() const { return mState; } + const std::string& connectAddress() const { return mConnectAddress ; } + uint16_t connectPort() const { return mConnectPort ; } + +private: + State mState; + std::string mConnectAddress; + uint16_t mConnectPort; + int mSocket; +}; + +class ThreadedTcpSocket: public TcpSocket, public RsThread +{ +public: + ThreadedTcpSocket(const std::string& tcp_address,uint16_t tcp_port); + virtual ~ThreadedTcpSocket(); + + virtual void run() override; +}; + diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index c87076e32..8eda33e22 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -156,6 +156,7 @@ rs_webui { } HEADERS += plugins/pluginmanager.h \ + friend_server/socketbio.h \ plugins/dlfcn_win32.h \ rsitems/rspluginitems.h \ util/i2pcommon.h \ @@ -404,9 +405,10 @@ HEADERS += pqi/authssl.h \ pqi/p3servicecontrol.h SOURCES += friend_server/fsclient.h \ - friend_server/fsbio.h \ friend_server/fsitem.h \ - friend_server/fsmanager.h + friend_server/fsmanager.h \ + friend_server/socketbio.cc \ + friend_server/tcpsocket.h HEADERS += rsserver/p3face.h \ rsserver/p3history.h \ @@ -572,8 +574,8 @@ SOURCES += pqi/authgpg.cc \ pqi/p3servicecontrol.cc SOURCES += friend_server/fsclient.cc \ - friend_server/fsbio.cc \ - friend_server/fsmanager.cc + friend_server/fsmanager.cc \ + friend_server/tcpsocket.cc SOURCES += rsserver/p3face-config.cc \ rsserver/p3face-server.cc \