using pqistreamer to deserialize data from FsNetwork

This commit is contained in:
csoler 2021-10-09 15:06:06 +02:00
parent d97ad8099c
commit 85bb831f47
3 changed files with 138 additions and 26 deletions

View file

@ -7,12 +7,14 @@ void FriendServer::threadTick()
{ {
// Listen to the network interface, capture incoming data etc. // Listen to the network interface, capture incoming data etc.
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::milliseconds(200));
pqi->tick();
} }
FriendServer::FriendServer(const std::string& base_dir) FriendServer::FriendServer(const std::string& base_dir)
{ {
RsDbg() << "Creating friend server." << std::endl; RsDbg() << "Creating friend server." ;
mBaseDirectory = base_dir; mBaseDirectory = base_dir;
} }
@ -21,6 +23,7 @@ void FriendServer::run()
// 1 - create network interface. // 1 - create network interface.
mni = new FsNetworkInterface; mni = new FsNetworkInterface;
mni->start();
RsSerialiser *rss = new RsSerialiser ; RsSerialiser *rss = new RsSerialiser ;
rss->addSerialType(new FsSerializer) ; rss->addSerialType(new FsSerializer) ;

View file

@ -37,12 +37,14 @@
#include "network.h" #include "network.h"
FsNetworkInterface::FsNetworkInterface() FsNetworkInterface::FsNetworkInterface()
: mFsNiMtx(std::string("FsNetworkInterface"))
{ {
RS_STACK_MUTEX(mFsNiMtx);
mClintListn = 0; mClintListn = 0;
start(); mTotalReadBytes = 0;
} mTotalBufferBytes = 0;
void FsNetworkInterface::start()
{
struct sockaddr_in ipOfServer; struct sockaddr_in ipOfServer;
mClintListn = socket(AF_INET, SOCK_STREAM, 0); // creating socket mClintListn = socket(AF_INET, SOCK_STREAM, 0); // creating socket
@ -54,9 +56,9 @@ void FsNetworkInterface::start()
ipOfServer.sin_port = htons(2017); // this is the port number of running server ipOfServer.sin_port = htons(2017); // this is the port number of running server
bind(mClintListn, (struct sockaddr*)&ipOfServer , sizeof(ipOfServer)); bind(mClintListn, (struct sockaddr*)&ipOfServer , sizeof(ipOfServer));
listen(mClintListn , 20); listen(mClintListn , 40);
RsDbg() << "Network interface now listening for TCP on " << sockaddr_storage_tostring( *(sockaddr_storage*)&ipOfServer) << std::endl; RsDbg() << "Network interface now listening for TCP on " << sockaddr_storage_tostring( *(sockaddr_storage*)&ipOfServer) ;
} }
int FsNetworkInterface::close() int FsNetworkInterface::close()
@ -65,12 +67,26 @@ int FsNetworkInterface::close()
return 1; return 1;
} }
void FsNetworkInterface::threadTick()
{
tick();
}
int FsNetworkInterface::tick() int FsNetworkInterface::tick()
{ {
int clintConnt = accept(mClintListn, (struct sockaddr*)NULL, NULL); std::cerr << "ticking FsNetworkInterface" << std::endl;
int clintConnt = accept(mClintListn, (struct sockaddr*)NULL, NULL); // accept is a blocking call!
char inBuffer[1025]; char inBuffer[1025];
int readbytes = read(clintConnt, inBuffer, strlen(inBuffer)); memset(inBuffer,0,1025);
int readbytes = read(clintConnt, inBuffer, sizeof(inBuffer));
if(readbytes < 0)
RsErr() << "read() failed. Errno=" << errno ;
std::cerr << "clintConnt: " << clintConnt << ", readbytes: " << readbytes << std::endl;
::close(clintConnt); ::close(clintConnt);
@ -80,10 +96,97 @@ int FsNetworkInterface::tick()
{ {
RsDbg() << "Received the following bytes: " << RsUtil::BinToHex( reinterpret_cast<unsigned char*>(inBuffer),readbytes,50) << std::endl; RsDbg() << "Received the following bytes: " << RsUtil::BinToHex( reinterpret_cast<unsigned char*>(inBuffer),readbytes,50) << std::endl;
RsDbg() << "Received the following bytes: " << std::string(inBuffer,readbytes) << std::endl; RsDbg() << "Received the following bytes: " << std::string(inBuffer,readbytes) << std::endl;
void *ptr = malloc(readbytes);
if(!ptr)
throw std::runtime_error("Cannot allocate memory! Go buy some RAM!");
memcpy(ptr,inBuffer,readbytes);
RS_STACK_MUTEX(mFsNiMtx);
in_buffer.push_back(std::make_pair(ptr,readbytes));
mTotalBufferBytes += readbytes;
mTotalReadBytes += readbytes;
RsDbg() << "InBuffer: " << in_buffer.size() << " elements. Total size: " << mTotalBufferBytes << ". Total read: " << mTotalReadBytes ;
} }
else
std::this_thread::sleep_for(std::chrono::seconds(1));
return true; return true;
} }
int FsNetworkInterface::senddata(void *, int len)
{
RsErr() << "Trying to send data through FsNetworkInterface although it's not implemented yet!"<< std::endl;
return false;
}
int FsNetworkInterface::readdata(void *data, int len)
{
RS_STACK_MUTEX(mFsNiMtx);
// read incoming bytes in the buffer
int total_len = 0;
while(total_len < len)
{
if(in_buffer.empty())
{
mTotalBufferBytes -= total_len;
return total_len;
}
// If the remaining buffer is too large, chop of the beginning of it.
if(total_len + in_buffer.front().second > len)
{
memcpy(&(static_cast<unsigned char *>(data)[total_len]),in_buffer.front().first,len - total_len);
void *ptr = malloc(in_buffer.front().second - (len - total_len));
memcpy(ptr,&(static_cast<unsigned char*>(in_buffer.front().first)[len - total_len]),in_buffer.front().second - (len - total_len));
free(in_buffer.front().first);
in_buffer.front().first = ptr;
in_buffer.front().second -= len-total_len;
mTotalBufferBytes -= len;
return len;
}
else // copy everything
{
memcpy(&(static_cast<unsigned char *>(data)[total_len]),in_buffer.front().first,in_buffer.front().second);
total_len += in_buffer.front().second;
free(in_buffer.front().first);
in_buffer.pop_front();
}
}
mTotalBufferBytes -= len;
return len;
}
int FsNetworkInterface::netstatus()
{
return 1; // dummy response.
}
int FsNetworkInterface::isactive()
{
RS_STACK_MUTEX(mFsNiMtx);
return mClintListn > 0;
}
bool FsNetworkInterface::moretoread(uint32_t /* usec */)
{
RS_STACK_MUTEX(mFsNiMtx);
return mTotalBufferBytes > 0;
}
bool FsNetworkInterface::cansend(uint32_t)
{
return false;
}

View file

@ -24,39 +24,45 @@
#include "util/rsthreads.h" #include "util/rsthreads.h"
#include "pqi/pqi_base.h" #include "pqi/pqi_base.h"
class FsNetworkInterface: public BinInterface class FsNetworkInterface: public BinInterface, public RsTickingThread
{ {
public: public:
FsNetworkInterface() ; FsNetworkInterface() ;
void start() ; // Implements RsTickingThread
void threadTick() override;
// Implements BinInterface methods // Implements BinInterface methods
virtual int tick() override; int tick() override;
virtual int senddata(void *data, int len) override; int senddata(void *data, int len) override;
virtual int readdata(void *data, int len) override; int readdata(void *data, int len) override;
virtual int netstatus() override; int netstatus() override;
virtual int isactive() override; int isactive() override;
virtual bool moretoread(uint32_t usec) override; bool moretoread(uint32_t usec) override;
virtual bool cansend(uint32_t usec) override; bool cansend(uint32_t usec) override;
virtual int close() override; int close() override;
/** /**
* If hashing data * If hashing data
**/ **/
virtual RsFileHash gethash() override { return RsFileHash() ; } RsFileHash gethash() override { return RsFileHash() ; }
virtual uint64_t bytecount() override { return mTotalBytes; } uint64_t bytecount() override { return mTotalReadBytes; }
virtual bool bandwidthLimited() override { return false; } bool bandwidthLimited() override { return false; }
private: private:
RsMutex mFsNiMtx;
void initListening(); void initListening();
void stopListening(); void stopListening();
int mClintListn ; int mClintListn ;
uint64_t mTotalBytes; uint64_t mTotalReadBytes;
uint64_t mTotalBufferBytes;
std::list<std::pair<void *,int> > in_buffer;
}; };