diff --git a/retroshare-friendserver/src/friendserver.cc b/retroshare-friendserver/src/friendserver.cc index 10584d401..2913aa4a9 100644 --- a/retroshare-friendserver/src/friendserver.cc +++ b/retroshare-friendserver/src/friendserver.cc @@ -7,12 +7,14 @@ void FriendServer::threadTick() { // Listen to the network interface, capture incoming data etc. - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + pqi->tick(); } FriendServer::FriendServer(const std::string& base_dir) { - RsDbg() << "Creating friend server." << std::endl; + RsDbg() << "Creating friend server." ; mBaseDirectory = base_dir; } @@ -21,6 +23,7 @@ void FriendServer::run() // 1 - create network interface. mni = new FsNetworkInterface; + mni->start(); RsSerialiser *rss = new RsSerialiser ; rss->addSerialType(new FsSerializer) ; diff --git a/retroshare-friendserver/src/network.cc b/retroshare-friendserver/src/network.cc index 5aab08cae..a8207d732 100644 --- a/retroshare-friendserver/src/network.cc +++ b/retroshare-friendserver/src/network.cc @@ -37,12 +37,14 @@ #include "network.h" FsNetworkInterface::FsNetworkInterface() + : mFsNiMtx(std::string("FsNetworkInterface")) { + RS_STACK_MUTEX(mFsNiMtx); + mClintListn = 0; - start(); -} -void FsNetworkInterface::start() -{ + mTotalReadBytes = 0; + mTotalBufferBytes = 0; + struct sockaddr_in ipOfServer; mClintListn = socket(AF_INET, SOCK_STREAM, 0); // creating socket @@ -54,9 +56,9 @@ void FsNetworkInterface::start() ipOfServer.sin_port = htons(2017); // this is the port number of running server bind(mClintListn, (struct sockaddr*)&ipOfServer , sizeof(ipOfServer)); - listen(mClintListn , 20); + listen(mClintListn , 40); - RsDbg() << "Network interface now listening for TCP on " << sockaddr_storage_tostring( *(sockaddr_storage*)&ipOfServer) << std::endl; + RsDbg() << "Network interface now listening for TCP on " << sockaddr_storage_tostring( *(sockaddr_storage*)&ipOfServer) ; } int FsNetworkInterface::close() @@ -65,12 +67,26 @@ int FsNetworkInterface::close() return 1; } +void FsNetworkInterface::threadTick() +{ + tick(); +} + int FsNetworkInterface::tick() { - int clintConnt = accept(mClintListn, (struct sockaddr*)NULL, NULL); + std::cerr << "ticking FsNetworkInterface" << std::endl; + + int clintConnt = accept(mClintListn, (struct sockaddr*)NULL, NULL); // accept is a blocking call! char inBuffer[1025]; - int readbytes = read(clintConnt, inBuffer, strlen(inBuffer)); + memset(inBuffer,0,1025); + + int readbytes = read(clintConnt, inBuffer, sizeof(inBuffer)); + + if(readbytes < 0) + RsErr() << "read() failed. Errno=" << errno ; + + std::cerr << "clintConnt: " << clintConnt << ", readbytes: " << readbytes << std::endl; ::close(clintConnt); @@ -80,10 +96,97 @@ int FsNetworkInterface::tick() { RsDbg() << "Received the following bytes: " << RsUtil::BinToHex( reinterpret_cast(inBuffer),readbytes,50) << std::endl; RsDbg() << "Received the following bytes: " << std::string(inBuffer,readbytes) << std::endl; + + void *ptr = malloc(readbytes); + + if(!ptr) + throw std::runtime_error("Cannot allocate memory! Go buy some RAM!"); + + memcpy(ptr,inBuffer,readbytes); + + RS_STACK_MUTEX(mFsNiMtx); + in_buffer.push_back(std::make_pair(ptr,readbytes)); + + mTotalBufferBytes += readbytes; + mTotalReadBytes += readbytes; + + RsDbg() << "InBuffer: " << in_buffer.size() << " elements. Total size: " << mTotalBufferBytes << ". Total read: " << mTotalReadBytes ; } - else - std::this_thread::sleep_for(std::chrono::seconds(1)); return true; } +int FsNetworkInterface::senddata(void *, int len) +{ + RsErr() << "Trying to send data through FsNetworkInterface although it's not implemented yet!"<< std::endl; + return false; +} +int FsNetworkInterface::readdata(void *data, int len) +{ + RS_STACK_MUTEX(mFsNiMtx); + + // read incoming bytes in the buffer + + int total_len = 0; + + while(total_len < len) + { + if(in_buffer.empty()) + { + mTotalBufferBytes -= total_len; + return total_len; + } + + // If the remaining buffer is too large, chop of the beginning of it. + + if(total_len + in_buffer.front().second > len) + { + memcpy(&(static_cast(data)[total_len]),in_buffer.front().first,len - total_len); + + void *ptr = malloc(in_buffer.front().second - (len - total_len)); + memcpy(ptr,&(static_cast(in_buffer.front().first)[len - total_len]),in_buffer.front().second - (len - total_len)); + + free(in_buffer.front().first); + in_buffer.front().first = ptr; + in_buffer.front().second -= len-total_len; + + mTotalBufferBytes -= len; + return len; + } + else // copy everything + { + memcpy(&(static_cast(data)[total_len]),in_buffer.front().first,in_buffer.front().second); + + total_len += in_buffer.front().second; + + free(in_buffer.front().first); + in_buffer.pop_front(); + } + } + mTotalBufferBytes -= len; + return len; +} + +int FsNetworkInterface::netstatus() +{ + return 1; // dummy response. +} +int FsNetworkInterface::isactive() +{ + RS_STACK_MUTEX(mFsNiMtx); + return mClintListn > 0; +} +bool FsNetworkInterface::moretoread(uint32_t /* usec */) +{ + RS_STACK_MUTEX(mFsNiMtx); + return mTotalBufferBytes > 0; +} +bool FsNetworkInterface::cansend(uint32_t) +{ + return false; +} + + + + + diff --git a/retroshare-friendserver/src/network.h b/retroshare-friendserver/src/network.h index 8417927e8..e276a8d63 100644 --- a/retroshare-friendserver/src/network.h +++ b/retroshare-friendserver/src/network.h @@ -24,39 +24,45 @@ #include "util/rsthreads.h" #include "pqi/pqi_base.h" -class FsNetworkInterface: public BinInterface +class FsNetworkInterface: public BinInterface, public RsTickingThread { public: FsNetworkInterface() ; - void start() ; + // Implements RsTickingThread + + void threadTick() override; // Implements BinInterface methods - virtual int tick() override; + int tick() override; - virtual int senddata(void *data, int len) override; - virtual int readdata(void *data, int len) override; + int senddata(void *data, int len) override; + int readdata(void *data, int len) override; - virtual int netstatus() override; - virtual int isactive() override; - virtual bool moretoread(uint32_t usec) override; - virtual bool cansend(uint32_t usec) override; + int netstatus() override; + int isactive() override; + bool moretoread(uint32_t usec) override; + bool cansend(uint32_t usec) override; - virtual int close() override; + int close() override; /** * If hashing data **/ - virtual RsFileHash gethash() override { return RsFileHash() ; } - virtual uint64_t bytecount() override { return mTotalBytes; } + RsFileHash gethash() override { return RsFileHash() ; } + uint64_t bytecount() override { return mTotalReadBytes; } - virtual bool bandwidthLimited() override { return false; } + bool bandwidthLimited() override { return false; } private: + RsMutex mFsNiMtx; void initListening(); void stopListening(); int mClintListn ; - uint64_t mTotalBytes; + uint64_t mTotalReadBytes; + uint64_t mTotalBufferBytes; + + std::list > in_buffer; };