moved part of the code to libretroshare/src/friend_server

This commit is contained in:
csoler 2021-10-24 17:41:23 +02:00
parent c589561396
commit b731cf34ee
10 changed files with 267 additions and 256 deletions

View File

@ -0,0 +1,133 @@
FsBioInterface::FsBioInterface(int socket)
: mCLintConnt(socket)
{
mTotalReadBytes=0;
mTotalBufferBytes=0;
}
int FsBioInterface::tick()
{
std::cerr << "ticking FsNetworkInterface" << std::endl;
// 2 - read incoming data pending on existing connections
char inBuffer[1025];
memset(inBuffer,0,1025);
int readbytes = read(mCLintConnt, inBuffer, sizeof(inBuffer));
if(readbytes == 0)
{
std::cerr << "Reached END of the stream!" << std::endl;
return 0;
}
if(readbytes < 0)
{
if(errno != EWOULDBLOCK && errno != EAGAIN)
RsErr() << "read() failed. Errno=" << errno ;
return false;
}
std::cerr << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes << std::endl;
//::close(clintConnt);
// display some debug info
if(readbytes > 0)
{
RsDbg() << "Received the following bytes: " << RsUtil::BinToHex( reinterpret_cast<unsigned char*>(inBuffer),readbytes,50) << std::endl;
//RsDbg() << "Received the following bytes: " << std::string(inBuffer,readbytes) << std::endl;
void *ptr = malloc(readbytes);
if(!ptr)
throw std::runtime_error("Cannot allocate memory! Go buy some RAM!");
memcpy(ptr,inBuffer,readbytes);
in_buffer.push_back(std::make_pair(ptr,readbytes));
mTotalBufferBytes += readbytes;
mTotalReadBytes += readbytes;
std::cerr << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalBufferBytes << std::endl ;
}
return true;
}
int FsBioInterface::readdata(void *data, int len)
{
// read incoming bytes in the buffer
int total_len = 0;
while(total_len < len)
{
if(in_buffer.empty())
{
mTotalBufferBytes -= total_len;
return total_len;
}
// If the remaining buffer is too large, chop of the beginning of it.
if(total_len + in_buffer.front().second > len)
{
memcpy(&(static_cast<unsigned char *>(data)[total_len]),in_buffer.front().first,len - total_len);
void *ptr = malloc(in_buffer.front().second - (len - total_len));
memcpy(ptr,&(static_cast<unsigned char*>(in_buffer.front().first)[len - total_len]),in_buffer.front().second - (len - total_len));
free(in_buffer.front().first);
in_buffer.front().first = ptr;
in_buffer.front().second -= len-total_len;
mTotalBufferBytes -= len;
return len;
}
else // copy everything
{
memcpy(&(static_cast<unsigned char *>(data)[total_len]),in_buffer.front().first,in_buffer.front().second);
total_len += in_buffer.front().second;
free(in_buffer.front().first);
in_buffer.pop_front();
}
}
mTotalBufferBytes -= len;
return len;
}
int FsBioInterface::senddata(void *data, int len)
{
// int written = write(mCLintConnt, data, len);
// return written;
return len;
}
int FsBioInterface::netstatus()
{
return 1; // dummy response.
}
int FsBioInterface::isactive()
{
return mCLintConnt > 0;
}
bool FsBioInterface::moretoread(uint32_t /* usec */)
{
return mTotalBufferBytes > 0;
}
bool FsBioInterface::cansend(uint32_t)
{
return isactive();
}
int FsBioInterface::close()
{
RsDbg() << "Stopping network interface" << std::endl;
return 1;
}

View File

@ -0,0 +1,35 @@
class FsBioInterface: public BinInterface
{
public:
FsBioInterface(int socket);
// Implements BinInterface methods
int tick() override;
int senddata(void *data, int len) override;
int readdata(void *data, int len) override;
int netstatus() override;
int isactive() override;
bool moretoread(uint32_t usec) override;
bool cansend(uint32_t usec) override;
int close() override;
/**
* If hashing data
**/
RsFileHash gethash() override { return RsFileHash() ; }
uint64_t bytecount() override { return mTotalReadBytes; }
bool bandwidthLimited() override { return false; }
private:
int mCLintConnt;
uint32_t mTotalReadBytes;
uint32_t mTotalBufferBytes;
std::list<std::pair<void *,int> > in_buffer;
};

View File

@ -0,0 +1,77 @@
FsClient::FsClient(const std::string& address)
: mServerAddress(address)
{
}
bool FsClient::sendItem(RsItem *item)
{
// open a connection
int CreateSocket = 0,n = 0;
char dataReceived[1024];
struct sockaddr_in ipOfServer;
memset(dataReceived, '0' ,sizeof(dataReceived));
if((CreateSocket = socket(AF_INET, SOCK_STREAM, 0))< 0)
{
printf("Socket not created \n");
return 1;
}
ipOfServer.sin_family = AF_INET;
ipOfServer.sin_port = htons(2017);
ipOfServer.sin_addr.s_addr = inet_addr("127.0.0.1");
if(connect(CreateSocket, (struct sockaddr *)&ipOfServer, sizeof(ipOfServer))<0)
{
printf("Connection failed due to port and ip problems, or server is not available\n");
return false;
}
// Serialise the item and send it.
uint32_t size = RsSerialiser::MAX_SERIAL_SIZE;
RsTemporaryMemory data(size);
if(!data)
{
RsErr() << "Cannot allocate memory to send item!" << std::endl;
return false;
}
FsSerializer *fss = new FsSerializer;
RsSerialiser rss;
rss.addSerialType(fss);
FsSerializer().serialise(item,data,&size);
// TODO: we should write in multiple chunks just in case the socket is not fully ready
write(CreateSocket,data,size);
// Now attempt to read and deserialize anything that comes back from that connexion
FsBioInterface bio(CreateSocket);
pqistreamer pqi(&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE);
pqithreadstreamer p(&pqi,&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE);
p.start();
while(true)
{
RsItem *item = p.GetItem();
if(!item)
{
rstime::rs_usleep(1000*200);
continue;
}
std::cerr << "Got a response item: " << std::endl;
std::cerr << *item << std::endl;
}
return 0;
// if ok, stream the item through it
}

View File

@ -0,0 +1,13 @@
// This class runs a client connection to the friend server. It opens a socket at each connection.
class FsClient
{
public:
FsClient(const std::string& address);
bool sendItem(RsItem *item);
private:
std::string mServerAddress;
};

View File

@ -404,6 +404,10 @@ HEADERS += pqi/authssl.h \
pqi/pqinetstatebox.h \
pqi/p3servicecontrol.h
SOURCES += friend_server/fsclient.h \
friend_server/fsbio.h \
friend_server/fsmanager.h
HEADERS += rsserver/p3face.h \
rsserver/p3history.h \
rsserver/p3msgs.h \
@ -569,6 +573,10 @@ SOURCES += pqi/authgpg.cc \
pqi/pqinetstatebox.cc \
pqi/p3servicecontrol.cc
SOURCES += friend_server/fsclient.cc \
friend_server/fsbio.cc \
friend_server/fsmanager.cc
SOURCES += rsserver/p3face-config.cc \
rsserver/p3face-server.cc \
rsserver/p3face-info.cc \

View File

@ -35,6 +35,7 @@
#include "util/rsdebug.h"
#include "pqi/pqithreadstreamer.h"
#include "friend_server/fsbio.h"
#include "network.h"
#include "fsitem.h"
@ -169,211 +170,3 @@ RsItem *FsNetworkInterface::GetItem()
return nullptr;
}
FsBioInterface::FsBioInterface(int socket)
: mCLintConnt(socket)
{
mTotalReadBytes=0;
mTotalBufferBytes=0;
}
int FsBioInterface::tick()
{
std::cerr << "ticking FsNetworkInterface" << std::endl;
// 2 - read incoming data pending on existing connections
char inBuffer[1025];
memset(inBuffer,0,1025);
int readbytes = read(mCLintConnt, inBuffer, sizeof(inBuffer));
if(readbytes == 0)
{
std::cerr << "Reached END of the stream!" << std::endl;
return 0;
}
if(readbytes < 0)
{
if(errno != EWOULDBLOCK && errno != EAGAIN)
RsErr() << "read() failed. Errno=" << errno ;
return false;
}
std::cerr << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes << std::endl;
//::close(clintConnt);
// display some debug info
if(readbytes > 0)
{
RsDbg() << "Received the following bytes: " << RsUtil::BinToHex( reinterpret_cast<unsigned char*>(inBuffer),readbytes,50) << std::endl;
//RsDbg() << "Received the following bytes: " << std::string(inBuffer,readbytes) << std::endl;
void *ptr = malloc(readbytes);
if(!ptr)
throw std::runtime_error("Cannot allocate memory! Go buy some RAM!");
memcpy(ptr,inBuffer,readbytes);
in_buffer.push_back(std::make_pair(ptr,readbytes));
mTotalBufferBytes += readbytes;
mTotalReadBytes += readbytes;
std::cerr << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalBufferBytes << std::endl ;
}
return true;
}
int FsBioInterface::readdata(void *data, int len)
{
// read incoming bytes in the buffer
int total_len = 0;
while(total_len < len)
{
if(in_buffer.empty())
{
mTotalBufferBytes -= total_len;
return total_len;
}
// If the remaining buffer is too large, chop of the beginning of it.
if(total_len + in_buffer.front().second > len)
{
memcpy(&(static_cast<unsigned char *>(data)[total_len]),in_buffer.front().first,len - total_len);
void *ptr = malloc(in_buffer.front().second - (len - total_len));
memcpy(ptr,&(static_cast<unsigned char*>(in_buffer.front().first)[len - total_len]),in_buffer.front().second - (len - total_len));
free(in_buffer.front().first);
in_buffer.front().first = ptr;
in_buffer.front().second -= len-total_len;
mTotalBufferBytes -= len;
return len;
}
else // copy everything
{
memcpy(&(static_cast<unsigned char *>(data)[total_len]),in_buffer.front().first,in_buffer.front().second);
total_len += in_buffer.front().second;
free(in_buffer.front().first);
in_buffer.pop_front();
}
}
mTotalBufferBytes -= len;
return len;
}
int FsBioInterface::senddata(void *data, int len)
{
// int written = write(mCLintConnt, data, len);
// return written;
return len;
}
int FsBioInterface::netstatus()
{
return 1; // dummy response.
}
int FsBioInterface::isactive()
{
return mCLintConnt > 0;
}
bool FsBioInterface::moretoread(uint32_t /* usec */)
{
return mTotalBufferBytes > 0;
}
bool FsBioInterface::cansend(uint32_t)
{
return isactive();
}
int FsBioInterface::close()
{
RsDbg() << "Stopping network interface" << std::endl;
return 1;
}
FsClient::FsClient(const std::string& address)
: mServerAddress(address)
{
}
bool FsClient::sendItem(RsItem *item)
{
// open a connection
int CreateSocket = 0,n = 0;
char dataReceived[1024];
struct sockaddr_in ipOfServer;
memset(dataReceived, '0' ,sizeof(dataReceived));
if((CreateSocket = socket(AF_INET, SOCK_STREAM, 0))< 0)
{
printf("Socket not created \n");
return 1;
}
ipOfServer.sin_family = AF_INET;
ipOfServer.sin_port = htons(2017);
ipOfServer.sin_addr.s_addr = inet_addr("127.0.0.1");
if(connect(CreateSocket, (struct sockaddr *)&ipOfServer, sizeof(ipOfServer))<0)
{
printf("Connection failed due to port and ip problems, or server is not available\n");
return false;
}
// Serialise the item and send it.
uint32_t size = RsSerialiser::MAX_SERIAL_SIZE;
RsTemporaryMemory data(size);
if(!data)
{
RsErr() << "Cannot allocate memory to send item!" << std::endl;
return false;
}
FsSerializer *fss = new FsSerializer;
RsSerialiser rss;
rss.addSerialType(fss);
FsSerializer().serialise(item,data,&size);
write(CreateSocket,data,size);
// Now attempt to read and deserialize anything that comes back from that connexion
FsBioInterface bio(CreateSocket);
pqistreamer pqi(&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE);
pqithreadstreamer p(&pqi,&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE);
p.start();
while(true)
{
RsItem *item = p.GetItem();
if(!item)
{
rstime::rs_usleep(1000*200);
continue;
}
std::cerr << "Got a response item: " << std::endl;
std::cerr << *item << std::endl;
}
return 0;
// if ok, stream the item through it
}

View File

@ -33,41 +33,6 @@ struct ConnectionData
pqistreamer *pqi;
};
class FsBioInterface: public BinInterface
{
public:
FsBioInterface(int socket);
// Implements BinInterface methods
int tick() override;
int senddata(void *data, int len) override;
int readdata(void *data, int len) override;
int netstatus() override;
int isactive() override;
bool moretoread(uint32_t usec) override;
bool cansend(uint32_t usec) override;
int close() override;
/**
* If hashing data
**/
RsFileHash gethash() override { return RsFileHash() ; }
uint64_t bytecount() override { return mTotalReadBytes; }
bool bandwidthLimited() override { return false; }
private:
int mCLintConnt;
uint32_t mTotalReadBytes;
uint32_t mTotalBufferBytes;
std::list<std::pair<void *,int> > in_buffer;
};
// This class handles multiple connections to the server and supplies RsItem elements
class FsNetworkInterface: public RsTickingThread
@ -97,19 +62,6 @@ private:
std::map<RsPeerId,ConnectionData> mConnections;
};
// This class runs a client connection to the friend server. It opens a socket at each connection.
class FsClient
{
public:
FsClient(const std::string& address);
bool sendItem(RsItem *item);
private:
std::string mServerAddress;
};