diff --git a/libretroshare/src/pqi/pqifdbin.cc b/libretroshare/src/pqi/pqifdbin.cc new file mode 100644 index 000000000..68798bf3d --- /dev/null +++ b/libretroshare/src/pqi/pqifdbin.cc @@ -0,0 +1,272 @@ +/******************************************************************************* + * libretroshare/src/file_sharing: fsbio.cc * + * * + * libretroshare: retroshare core library * + * * + * Copyright 2021 by retroshare team * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU Lesser General Public License as * + * published by the Free Software Foundation, either version 3 of the * + * License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public License * + * along with this program. If not, see . * + * * + ******************************************************************************/ + +#include "util/rsprint.h" +#include "pqi/pqifdbin.h" + +RsFdBinInterface::RsFdBinInterface(int file_descriptor) + : mCLintConnt(file_descriptor),mIsActive(file_descriptor!=0) +{ + mTotalReadBytes=0; + mTotalInBufferBytes=0; + mTotalWrittenBytes=0; + mTotalOutBufferBytes=0; +} + +void RsFdBinInterface::setSocket(int s) +{ + if(mIsActive != 0) + { + RsErr() << "Changing socket to active FsBioInterface! Canceling all pending R/W data." ; + close(); + } + mCLintConnt = s; + mIsActive = (s!=0); +} +int RsFdBinInterface::tick() +{ + if(!mIsActive) + { + RsErr() << "Ticking a non active FsBioInterface!" ; + return 0; + } + // 2 - read incoming data pending on existing connections + + int res=0; + + res += read_pending(); + res += write_pending(); + + return res; +} + +int RsFdBinInterface::read_pending() +{ + char inBuffer[1025]; + memset(inBuffer,0,1025); + + ssize_t readbytes = recv(mCLintConnt, inBuffer, sizeof(inBuffer),MSG_DONTWAIT); + + if(readbytes == 0) + { + RsDbg() << "Reached END of the stream!" ; + RsDbg() << "Closing!" ; + + close(); + return mTotalInBufferBytes; + } + if(readbytes < 0) + { + if(errno != EWOULDBLOCK && errno != EAGAIN) + RsErr() << "read() failed. Errno=" << errno ; + + return mTotalInBufferBytes; + } + + RsDbg() << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes ; + + // display some debug info + + if(readbytes > 0) + { + RsDbg() << "Received the following bytes: " << RsUtil::BinToHex( reinterpret_cast(inBuffer),readbytes,50) << std::endl; + //RsDbg() << "Received the following bytes: " << std::string(inBuffer,readbytes) << std::endl; + + void *ptr = malloc(readbytes); + + if(!ptr) + throw std::runtime_error("Cannot allocate memory! Go buy some RAM!"); + + memcpy(ptr,inBuffer,readbytes); + + in_buffer.push_back(std::make_pair(ptr,readbytes)); + mTotalInBufferBytes += readbytes; + mTotalReadBytes += readbytes; + + RsDbg() << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalInBufferBytes ; + } + return mTotalInBufferBytes; +} + +int RsFdBinInterface::write_pending() +{ + if(out_buffer.empty()) + return mTotalOutBufferBytes; + + auto& p = out_buffer.front(); + int written = write(mCLintConnt, p.first, p.second); + + if(written < 0) + { + if(errno != EWOULDBLOCK && errno != EAGAIN) + RsErr() << "write() failed. Errno=" << errno ; + + return mTotalOutBufferBytes; + } + + if(written == 0) + { + RsErr() << "write() failed. Nothing sent."; + return mTotalOutBufferBytes; + } + + RsDbg() << "clintConnt: " << mCLintConnt << ", written: " << written ; + + // display some debug info + + RsDbg() << "Sent the following bytes: " << RsUtil::BinToHex( reinterpret_cast(p.first),written,50) << std::endl; + + if(written < p.second) + { + void *ptr = malloc(p.second - written); + + if(!ptr) + throw std::runtime_error("Cannot allocate memory! Go buy some RAM!"); + + memcpy(ptr,static_cast(p.first) + written,p.second - written); + free(p.first); + + out_buffer.front().first = ptr; + out_buffer.front().second = p.second - written; + } + else + { + free(p.first); + out_buffer.pop_front(); + } + + mTotalOutBufferBytes -= written; + mTotalWrittenBytes += written; + + return mTotalOutBufferBytes; +} + +RsFdBinInterface::~RsFdBinInterface() +{ + clean(); +} + +void RsFdBinInterface::clean() +{ + for(auto p:in_buffer) free(p.first); + for(auto p:out_buffer) free(p.first); + + in_buffer.clear(); + out_buffer.clear(); +} +int RsFdBinInterface::readdata(void *data, int len) +{ + // read incoming bytes in the buffer + + int total_len = 0; + + while(total_len < len) + { + if(in_buffer.empty()) + { + mTotalInBufferBytes -= total_len; + return total_len; + } + + // If the remaining buffer is too large, chop of the beginning of it. + + if(total_len + in_buffer.front().second > len) + { + memcpy(&(static_cast(data)[total_len]),in_buffer.front().first,len - total_len); + + void *ptr = malloc(in_buffer.front().second - (len - total_len)); + memcpy(ptr,&(static_cast(in_buffer.front().first)[len - total_len]),in_buffer.front().second - (len - total_len)); + + free(in_buffer.front().first); + in_buffer.front().first = ptr; + in_buffer.front().second -= len-total_len; + + mTotalInBufferBytes -= len; + return len; + } + else // copy everything + { + memcpy(&(static_cast(data)[total_len]),in_buffer.front().first,in_buffer.front().second); + + total_len += in_buffer.front().second; + + free(in_buffer.front().first); + in_buffer.pop_front(); + } + } + mTotalInBufferBytes -= len; + return len; +} + +int RsFdBinInterface::senddata(void *data, int len) +{ + // shouldn't we better send in multiple packets, similarly to how we read? + + if(len == 0) + { + RsErr() << "Calling FsBioInterface::senddata() with null size or null data pointer"; + return 0; + } + void *ptr = malloc(len); + + if(!ptr) + { + RsErr() << "Cannot allocate data of size " << len ; + return 0; + } + + memcpy(ptr,data,len); + out_buffer.push_back(std::make_pair(ptr,len)); + + mTotalOutBufferBytes += len; + return len; +} +int RsFdBinInterface::netstatus() +{ + return mIsActive; // dummy response. +} + +int RsFdBinInterface::isactive() +{ + return mIsActive ; +} + +bool RsFdBinInterface::moretoread(uint32_t /* usec */) +{ + return mTotalInBufferBytes > 0; +} +bool RsFdBinInterface::cansend(uint32_t) +{ + return isactive(); +} + +int RsFdBinInterface::close() +{ + RsDbg() << "Stopping network interface" << std::endl; + mIsActive = false; + mCLintConnt = 0; + clean(); + + return 1; +} + + diff --git a/libretroshare/src/pqi/pqifdbin.h b/libretroshare/src/pqi/pqifdbin.h new file mode 100644 index 000000000..4c532b436 --- /dev/null +++ b/libretroshare/src/pqi/pqifdbin.h @@ -0,0 +1,77 @@ +/******************************************************************************* + * libretroshare/src/file_sharing: fsbio.h * + * * + * libretroshare: retroshare core library * + * * + * Copyright 2021 by retroshare team * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU Lesser General Public License as * + * published by the Free Software Foundation, either version 3 of the * + * License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public License * + * along with this program. If not, see . * + * * + ******************************************************************************/ + +#include "pqi/pqi_base.h" + +class RsFdBinInterface: public BinInterface +{ +public: + RsFdBinInterface(int file_descriptor); + ~RsFdBinInterface(); + + // Implements BinInterface methods + + int tick() override; + + // Schedule data to be sent at the next tick(). The caller keeps memory ownership. + // + int senddata(void *data, int len) override; + + // Obtains new data from the interface. "data" needs to be initialized for room + // to len bytes. The returned value is the actual size of what was read. + // + int readdata(void *data, int len) override; + + int netstatus() override; + int isactive() override; + bool moretoread(uint32_t usec) override; + bool cansend(uint32_t usec) override; + + int close() override; + + /** + * If hashing data + **/ + RsFileHash gethash() override { return RsFileHash() ; } + uint64_t bytecount() override { return mTotalReadBytes; } + + bool bandwidthLimited() override { return false; } + +protected: + void setSocket(int s); + void clean(); + +private: + int read_pending(); + int write_pending(); + + int mCLintConnt; + bool mIsActive; + uint32_t mTotalReadBytes; + uint32_t mTotalInBufferBytes; + uint32_t mTotalWrittenBytes; + uint32_t mTotalOutBufferBytes; + + std::list > in_buffer; + std::list > out_buffer; +}; + diff --git a/libretroshare/src/pqi/rstcpsocket.cc b/libretroshare/src/pqi/rstcpsocket.cc new file mode 100644 index 000000000..25f830914 --- /dev/null +++ b/libretroshare/src/pqi/rstcpsocket.cc @@ -0,0 +1,75 @@ +#include +#include +#include +#include +#include + +#include "rstcpsocket.h" + +RsTcpSocket::RsTcpSocket(const std::string& tcp_address,uint16_t tcp_port) + :RsFdBinInterface(0),mState(DISCONNECTED),mConnectAddress(tcp_address),mConnectPort(tcp_port),mSocket(0) +{ +} +int RsTcpSocket::connect() +{ + int CreateSocket = 0; + char dataReceived[1024]; + struct sockaddr_in ipOfServer; + + memset(dataReceived, '0' ,sizeof(dataReceived)); + + if((CreateSocket = socket(AF_INET, SOCK_STREAM, 0))< 0) + { + printf("Socket not created \n"); + return false; + } + + ipOfServer.sin_family = AF_INET; + ipOfServer.sin_port = htons(mConnectPort); + ipOfServer.sin_addr.s_addr = inet_addr(mConnectAddress.c_str()); + + if(::connect(mSocket, (struct sockaddr *)&ipOfServer, sizeof(ipOfServer))<0) + { + printf("Connection failed due to port and ip problems, or server is not available\n"); + return false; + } + mState = CONNECTED; + setSocket(mSocket); + + return true; +} + +int RsTcpSocket::close() +{ + RsFdBinInterface::close(); + + return !::close(mSocket); +} + +RsThreadedTcpSocket::RsThreadedTcpSocket(const std::string& tcp_address,uint16_t tcp_port) + : RsTcpSocket(tcp_address,tcp_port) +{ +} + +void RsThreadedTcpSocket::run() +{ + if(!connect()) + { + RsErr() << "Cannot connect socket to " << connectAddress() << ":" << connectPort() ; + return ; + } + + while(connectionState() == CONNECTED) + { + tick(); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + RsWarn() << "Connection to " << connectAddress() << ":" << connectPort() << " is now closed."; +} + +RsThreadedTcpSocket::~RsThreadedTcpSocket() +{ + fullstop(); // fully wait for stopping. + + close(); +} diff --git a/libretroshare/src/pqi/rstcpsocket.h b/libretroshare/src/pqi/rstcpsocket.h new file mode 100644 index 000000000..bdc127f91 --- /dev/null +++ b/libretroshare/src/pqi/rstcpsocket.h @@ -0,0 +1,41 @@ +#include +#include "util/rsthreads.h" +#include "pqi/pqifdbin.h" + +class RsTcpSocket: public RsFdBinInterface +{ +public: + RsTcpSocket(const std::string& tcp_address,uint16_t tcp_port); + + enum State: uint8_t { + UNKNOWN = 0x00, + DISCONNECTED = 0x01, + CONNECTED = 0x02 + }; + + // Return 1 when OK, 0 otherwise. + int connect(); + + // Returns 1 when OK, 0 otherwise. + int close(); + + State connectionState() const { return mState; } + const std::string& connectAddress() const { return mConnectAddress ; } + uint16_t connectPort() const { return mConnectPort ; } + +private: + State mState; + std::string mConnectAddress; + uint16_t mConnectPort; + int mSocket; +}; + +class RsThreadedTcpSocket: public RsTcpSocket, public RsThread +{ +public: + RsThreadedTcpSocket(const std::string& tcp_address,uint16_t tcp_port); + virtual ~RsThreadedTcpSocket(); + + virtual void run() override; +}; +