created independent tcpsocket class to be used also in TorManager

This commit is contained in:
csoler 2021-11-14 23:31:40 +01:00
parent bef780e0c7
commit a5b1f2d976
5 changed files with 263 additions and 21 deletions

View File

@ -24,16 +24,43 @@
#include "fsbio.h"
FsBioInterface::FsBioInterface(int socket)
: mCLintConnt(socket),mIsActive(true)
: mCLintConnt(socket),mIsActive(socket!=0)
{
mTotalReadBytes=0;
mTotalBufferBytes=0;
mTotalInBufferBytes=0;
mTotalWrittenBytes=0;
mTotalOutBufferBytes=0;
}
void FsBioInterface::setSocket(int s)
{
if(mIsActive != 0)
{
RsErr() << "Changing socket to active FsBioInterface! Canceling all pending R/W data." ;
close();
}
mCLintConnt = s;
mIsActive = (s!=0);
}
int FsBioInterface::tick()
{
if(!mIsActive)
{
RsErr() << "Ticking a non active FsBioInterface!" ;
return 0;
}
// 2 - read incoming data pending on existing connections
int res=0;
res += read_pending();
res += write_pending();
return res;
}
int FsBioInterface::read_pending()
{
char inBuffer[1025];
memset(inBuffer,0,1025);
@ -45,14 +72,14 @@ int FsBioInterface::tick()
RsDbg() << "Closing!" ;
close();
return mTotalBufferBytes;
return mTotalInBufferBytes;
}
if(readbytes < 0)
{
if(errno != EWOULDBLOCK && errno != EAGAIN)
RsErr() << "read() failed. Errno=" << errno ;
return mTotalBufferBytes;
return mTotalInBufferBytes;
}
RsDbg() << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes ;
@ -72,15 +99,80 @@ int FsBioInterface::tick()
memcpy(ptr,inBuffer,readbytes);
in_buffer.push_back(std::make_pair(ptr,readbytes));
mTotalBufferBytes += readbytes;
mTotalInBufferBytes += readbytes;
mTotalReadBytes += readbytes;
RsDbg() << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalBufferBytes ;
RsDbg() << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalInBufferBytes ;
}
return mTotalBufferBytes;
return mTotalInBufferBytes;
}
int FsBioInterface::write_pending()
{
if(out_buffer.empty())
return mTotalOutBufferBytes;
auto& p = out_buffer.front();
int written = write(mCLintConnt, p.first, p.second);
if(written < 0)
{
if(errno != EWOULDBLOCK && errno != EAGAIN)
RsErr() << "write() failed. Errno=" << errno ;
return mTotalOutBufferBytes;
}
if(written == 0)
{
RsErr() << "write() failed. Nothing sent.";
return mTotalOutBufferBytes;
}
RsDbg() << "clintConnt: " << mCLintConnt << ", written: " << written ;
// display some debug info
RsDbg() << "Sent the following bytes: " << RsUtil::BinToHex( reinterpret_cast<unsigned char*>(p.first),written,50) << std::endl;
if(written < p.second)
{
void *ptr = malloc(p.second - written);
if(!ptr)
throw std::runtime_error("Cannot allocate memory! Go buy some RAM!");
memcpy(ptr,static_cast<unsigned char *>(p.first) + written,p.second - written);
free(p.first);
out_buffer.front().first = ptr;
out_buffer.front().second = p.second - written;
}
else
{
free(p.first);
out_buffer.pop_front();
}
mTotalOutBufferBytes -= written;
mTotalWrittenBytes += written;
return mTotalOutBufferBytes;
}
FsBioInterface::~FsBioInterface()
{
clean();
}
void FsBioInterface::clean()
{
for(auto p:in_buffer) free(p.first);
for(auto p:out_buffer) free(p.first);
in_buffer.clear();
out_buffer.clear();
}
int FsBioInterface::readdata(void *data, int len)
{
// read incoming bytes in the buffer
@ -91,7 +183,7 @@ int FsBioInterface::readdata(void *data, int len)
{
if(in_buffer.empty())
{
mTotalBufferBytes -= total_len;
mTotalInBufferBytes -= total_len;
return total_len;
}
@ -108,7 +200,7 @@ int FsBioInterface::readdata(void *data, int len)
in_buffer.front().first = ptr;
in_buffer.front().second -= len-total_len;
mTotalBufferBytes -= len;
mTotalInBufferBytes -= len;
return len;
}
else // copy everything
@ -121,7 +213,7 @@ int FsBioInterface::readdata(void *data, int len)
in_buffer.pop_front();
}
}
mTotalBufferBytes -= len;
mTotalInBufferBytes -= len;
return len;
}
@ -129,12 +221,24 @@ int FsBioInterface::senddata(void *data, int len)
{
// shouldn't we better send in multiple packets, similarly to how we read?
RsDbg() << "FsBioInterface: sending data packet of size " << len ;
if(len == 0)
{
RsErr() << "Calling FsBioInterface::senddata() with null size or null data pointer";
return 0;
}
void *ptr = malloc(len);
int written = write(mCLintConnt, data, len);
RsDbg() << "FsBioInterface: done.";
if(!ptr)
{
RsErr() << "Cannot allocate data of size " << len ;
return 0;
}
return written;
memcpy(ptr,data,len);
out_buffer.push_back(std::make_pair(ptr,len));
mTotalOutBufferBytes += len;
return len;
}
int FsBioInterface::netstatus()
{
@ -148,7 +252,7 @@ int FsBioInterface::isactive()
bool FsBioInterface::moretoread(uint32_t /* usec */)
{
return mTotalBufferBytes > 0;
return mTotalInBufferBytes > 0;
}
bool FsBioInterface::cansend(uint32_t)
{
@ -159,6 +263,9 @@ int FsBioInterface::close()
{
RsDbg() << "Stopping network interface" << std::endl;
mIsActive = false;
mCLintConnt = 0;
clean();
return 1;
}

View File

@ -26,12 +26,19 @@ class FsBioInterface: public BinInterface
{
public:
FsBioInterface(int socket);
~FsBioInterface();
// Implements BinInterface methods
int tick() override;
// Schedule data to be sent at the next tick(). The caller keeps memory ownership.
//
int senddata(void *data, int len) override;
// Obtains new data from the interface. "data" needs to be initialized for room
// to len bytes. The returned value is the actual size of what was read.
//
int readdata(void *data, int len) override;
int netstatus() override;
@ -49,12 +56,22 @@ public:
bool bandwidthLimited() override { return false; }
protected:
void setSocket(int s);
void clean();
private:
int read_pending();
int write_pending();
int mCLintConnt;
bool mIsActive;
uint32_t mTotalReadBytes;
uint32_t mTotalBufferBytes;
uint32_t mTotalInBufferBytes;
uint32_t mTotalWrittenBytes;
uint32_t mTotalOutBufferBytes;
std::list<std::pair<void *,int> > in_buffer;
std::list<std::pair<void *,int> > out_buffer;
};

View File

@ -0,0 +1,75 @@
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
#include <iostream>
#include "tcpsocket.h"
TcpSocket::TcpSocket(const std::string& tcp_address,uint16_t tcp_port)
:FsBioInterface(0),mState(DISCONNECTED),mConnectAddress(tcp_address),mConnectPort(tcp_port),mSocket(0)
{
}
int TcpSocket::connect()
{
int CreateSocket = 0;
char dataReceived[1024];
struct sockaddr_in ipOfServer;
memset(dataReceived, '0' ,sizeof(dataReceived));
if((CreateSocket = socket(AF_INET, SOCK_STREAM, 0))< 0)
{
printf("Socket not created \n");
return false;
}
ipOfServer.sin_family = AF_INET;
ipOfServer.sin_port = htons(mConnectPort);
ipOfServer.sin_addr.s_addr = inet_addr(mConnectAddress.c_str());
if(::connect(mSocket, (struct sockaddr *)&ipOfServer, sizeof(ipOfServer))<0)
{
printf("Connection failed due to port and ip problems, or server is not available\n");
return false;
}
mState = CONNECTED;
setSocket(mSocket);
return true;
}
int TcpSocket::close()
{
FsBioInterface::close();
return !::close(mSocket);
}
ThreadedTcpSocket::ThreadedTcpSocket(const std::string& tcp_address,uint16_t tcp_port)
: TcpSocket(tcp_address,tcp_port)
{
}
void ThreadedTcpSocket::run()
{
if(!connect())
{
RsErr() << "Cannot connect socket to " << connectAddress() << ":" << connectPort() ;
return ;
}
while(connectionState() == CONNECTED)
{
tick();
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
RsWarn() << "Connection to " << connectAddress() << ":" << connectPort() << " is now closed.";
}
ThreadedTcpSocket::~ThreadedTcpSocket()
{
fullstop(); // fully wait for stopping.
close();
}

View File

@ -0,0 +1,41 @@
#include <string>
#include "util/rsthreads.h"
#include "friend_server/fsbio.h"
class TcpSocket: public FsBioInterface
{
public:
TcpSocket(const std::string& tcp_address,uint16_t tcp_port);
enum State: uint8_t {
UNKNOWN = 0x00,
DISCONNECTED = 0x01,
CONNECTED = 0x02
};
// Return 1 when OK, 0 otherwise.
int connect();
// Returns 1 when OK, 0 otherwise.
int close();
State connectionState() const { return mState; }
const std::string& connectAddress() const { return mConnectAddress ; }
uint16_t connectPort() const { return mConnectPort ; }
private:
State mState;
std::string mConnectAddress;
uint16_t mConnectPort;
int mSocket;
};
class ThreadedTcpSocket: public TcpSocket, public RsThread
{
public:
ThreadedTcpSocket(const std::string& tcp_address,uint16_t tcp_port);
virtual ~ThreadedTcpSocket();
virtual void run() override;
};

View File

@ -156,6 +156,7 @@ rs_webui {
}
HEADERS += plugins/pluginmanager.h \
friend_server/socketbio.h \
plugins/dlfcn_win32.h \
rsitems/rspluginitems.h \
util/i2pcommon.h \
@ -404,9 +405,10 @@ HEADERS += pqi/authssl.h \
pqi/p3servicecontrol.h
SOURCES += friend_server/fsclient.h \
friend_server/fsbio.h \
friend_server/fsitem.h \
friend_server/fsmanager.h
friend_server/fsmanager.h \
friend_server/socketbio.cc \
friend_server/tcpsocket.h
HEADERS += rsserver/p3face.h \
rsserver/p3history.h \
@ -572,8 +574,8 @@ SOURCES += pqi/authgpg.cc \
pqi/p3servicecontrol.cc
SOURCES += friend_server/fsclient.cc \
friend_server/fsbio.cc \
friend_server/fsmanager.cc
friend_server/fsmanager.cc \
friend_server/tcpsocket.cc
SOURCES += rsserver/p3face-config.cc \
rsserver/p3face-server.cc \