diff --git a/libretroshare/src/friend_server/fsbio.cc b/libretroshare/src/friend_server/fsbio.cc new file mode 100644 index 000000000..4104c3c0d --- /dev/null +++ b/libretroshare/src/friend_server/fsbio.cc @@ -0,0 +1,133 @@ +FsBioInterface::FsBioInterface(int socket) + : mCLintConnt(socket) +{ + mTotalReadBytes=0; + mTotalBufferBytes=0; +} + +int FsBioInterface::tick() +{ + std::cerr << "ticking FsNetworkInterface" << std::endl; + + // 2 - read incoming data pending on existing connections + + char inBuffer[1025]; + memset(inBuffer,0,1025); + + int readbytes = read(mCLintConnt, inBuffer, sizeof(inBuffer)); + + if(readbytes == 0) + { + std::cerr << "Reached END of the stream!" << std::endl; + return 0; + } + if(readbytes < 0) + { + if(errno != EWOULDBLOCK && errno != EAGAIN) + RsErr() << "read() failed. Errno=" << errno ; + + return false; + } + + std::cerr << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes << std::endl; + + //::close(clintConnt); + + // display some debug info + + if(readbytes > 0) + { + RsDbg() << "Received the following bytes: " << RsUtil::BinToHex( reinterpret_cast(inBuffer),readbytes,50) << std::endl; + //RsDbg() << "Received the following bytes: " << std::string(inBuffer,readbytes) << std::endl; + + void *ptr = malloc(readbytes); + + if(!ptr) + throw std::runtime_error("Cannot allocate memory! Go buy some RAM!"); + + memcpy(ptr,inBuffer,readbytes); + + in_buffer.push_back(std::make_pair(ptr,readbytes)); + mTotalBufferBytes += readbytes; + mTotalReadBytes += readbytes; + + std::cerr << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalBufferBytes << std::endl ; + } + + return true; +} + +int FsBioInterface::readdata(void *data, int len) +{ + // read incoming bytes in the buffer + + int total_len = 0; + + while(total_len < len) + { + if(in_buffer.empty()) + { + mTotalBufferBytes -= total_len; + return total_len; + } + + // If the remaining buffer is too large, chop of the beginning of it. + + if(total_len + in_buffer.front().second > len) + { + memcpy(&(static_cast(data)[total_len]),in_buffer.front().first,len - total_len); + + void *ptr = malloc(in_buffer.front().second - (len - total_len)); + memcpy(ptr,&(static_cast(in_buffer.front().first)[len - total_len]),in_buffer.front().second - (len - total_len)); + + free(in_buffer.front().first); + in_buffer.front().first = ptr; + in_buffer.front().second -= len-total_len; + + mTotalBufferBytes -= len; + return len; + } + else // copy everything + { + memcpy(&(static_cast(data)[total_len]),in_buffer.front().first,in_buffer.front().second); + + total_len += in_buffer.front().second; + + free(in_buffer.front().first); + in_buffer.pop_front(); + } + } + mTotalBufferBytes -= len; + return len; +} + +int FsBioInterface::senddata(void *data, int len) +{ +// int written = write(mCLintConnt, data, len); +// return written; + return len; +} +int FsBioInterface::netstatus() +{ + return 1; // dummy response. +} +int FsBioInterface::isactive() +{ + return mCLintConnt > 0; +} +bool FsBioInterface::moretoread(uint32_t /* usec */) +{ + return mTotalBufferBytes > 0; +} +bool FsBioInterface::cansend(uint32_t) +{ + return isactive(); +} + +int FsBioInterface::close() +{ + RsDbg() << "Stopping network interface" << std::endl; + return 1; +} + + diff --git a/libretroshare/src/friend_server/fsbio.h b/libretroshare/src/friend_server/fsbio.h new file mode 100644 index 000000000..2e872d48d --- /dev/null +++ b/libretroshare/src/friend_server/fsbio.h @@ -0,0 +1,35 @@ +class FsBioInterface: public BinInterface +{ +public: + FsBioInterface(int socket); + + // Implements BinInterface methods + + int tick() override; + + int senddata(void *data, int len) override; + int readdata(void *data, int len) override; + + int netstatus() override; + int isactive() override; + bool moretoread(uint32_t usec) override; + bool cansend(uint32_t usec) override; + + int close() override; + + /** + * If hashing data + **/ + RsFileHash gethash() override { return RsFileHash() ; } + uint64_t bytecount() override { return mTotalReadBytes; } + + bool bandwidthLimited() override { return false; } + +private: + int mCLintConnt; + uint32_t mTotalReadBytes; + uint32_t mTotalBufferBytes; + + std::list > in_buffer; +}; + diff --git a/libretroshare/src/friend_server/fsclient.cc b/libretroshare/src/friend_server/fsclient.cc new file mode 100644 index 000000000..6ee8518af --- /dev/null +++ b/libretroshare/src/friend_server/fsclient.cc @@ -0,0 +1,77 @@ +FsClient::FsClient(const std::string& address) + : mServerAddress(address) +{ +} + +bool FsClient::sendItem(RsItem *item) +{ + // open a connection + + int CreateSocket = 0,n = 0; + char dataReceived[1024]; + struct sockaddr_in ipOfServer; + + memset(dataReceived, '0' ,sizeof(dataReceived)); + + if((CreateSocket = socket(AF_INET, SOCK_STREAM, 0))< 0) + { + printf("Socket not created \n"); + return 1; + } + + ipOfServer.sin_family = AF_INET; + ipOfServer.sin_port = htons(2017); + ipOfServer.sin_addr.s_addr = inet_addr("127.0.0.1"); + + if(connect(CreateSocket, (struct sockaddr *)&ipOfServer, sizeof(ipOfServer))<0) + { + printf("Connection failed due to port and ip problems, or server is not available\n"); + return false; + } + + // Serialise the item and send it. + + uint32_t size = RsSerialiser::MAX_SERIAL_SIZE; + RsTemporaryMemory data(size); + + if(!data) + { + RsErr() << "Cannot allocate memory to send item!" << std::endl; + return false; + } + + FsSerializer *fss = new FsSerializer; + RsSerialiser rss; + rss.addSerialType(fss); + + FsSerializer().serialise(item,data,&size); + + // TODO: we should write in multiple chunks just in case the socket is not fully ready + write(CreateSocket,data,size); + + // Now attempt to read and deserialize anything that comes back from that connexion + + FsBioInterface bio(CreateSocket); + pqistreamer pqi(&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE); + pqithreadstreamer p(&pqi,&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE); + p.start(); + + while(true) + { + RsItem *item = p.GetItem(); + + if(!item) + { + rstime::rs_usleep(1000*200); + continue; + } + + std::cerr << "Got a response item: " << std::endl; + std::cerr << *item << std::endl; + } + + return 0; + + // if ok, stream the item through it +} + diff --git a/libretroshare/src/friend_server/fsclient.h b/libretroshare/src/friend_server/fsclient.h new file mode 100644 index 000000000..6e79ed1b3 --- /dev/null +++ b/libretroshare/src/friend_server/fsclient.h @@ -0,0 +1,13 @@ +// This class runs a client connection to the friend server. It opens a socket at each connection. + +class FsClient +{ +public: + FsClient(const std::string& address); + + bool sendItem(RsItem *item); + +private: + std::string mServerAddress; +}; + diff --git a/retroshare-friendserver/src/fsitem.h b/libretroshare/src/friend_server/fsitem.h similarity index 100% rename from retroshare-friendserver/src/fsitem.h rename to libretroshare/src/friend_server/fsitem.h diff --git a/libretroshare/src/friend_server/fsmanager.cc b/libretroshare/src/friend_server/fsmanager.cc new file mode 100644 index 000000000..e69de29bb diff --git a/libretroshare/src/friend_server/fsmanager.h b/libretroshare/src/friend_server/fsmanager.h new file mode 100644 index 000000000..e69de29bb diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index caaabd0b7..d6f0397df 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -404,6 +404,10 @@ HEADERS += pqi/authssl.h \ pqi/pqinetstatebox.h \ pqi/p3servicecontrol.h +SOURCES += friend_server/fsclient.h \ + friend_server/fsbio.h \ + friend_server/fsmanager.h + HEADERS += rsserver/p3face.h \ rsserver/p3history.h \ rsserver/p3msgs.h \ @@ -569,6 +573,10 @@ SOURCES += pqi/authgpg.cc \ pqi/pqinetstatebox.cc \ pqi/p3servicecontrol.cc +SOURCES += friend_server/fsclient.cc \ + friend_server/fsbio.cc \ + friend_server/fsmanager.cc + SOURCES += rsserver/p3face-config.cc \ rsserver/p3face-server.cc \ rsserver/p3face-info.cc \ diff --git a/retroshare-friendserver/src/network.cc b/retroshare-friendserver/src/network.cc index d309f8a13..c5451e9b1 100644 --- a/retroshare-friendserver/src/network.cc +++ b/retroshare-friendserver/src/network.cc @@ -35,6 +35,7 @@ #include "util/rsdebug.h" #include "pqi/pqithreadstreamer.h" +#include "friend_server/fsbio.h" #include "network.h" #include "fsitem.h" @@ -169,211 +170,3 @@ RsItem *FsNetworkInterface::GetItem() return nullptr; } -FsBioInterface::FsBioInterface(int socket) - : mCLintConnt(socket) -{ - mTotalReadBytes=0; - mTotalBufferBytes=0; -} - -int FsBioInterface::tick() -{ - std::cerr << "ticking FsNetworkInterface" << std::endl; - - // 2 - read incoming data pending on existing connections - - char inBuffer[1025]; - memset(inBuffer,0,1025); - - int readbytes = read(mCLintConnt, inBuffer, sizeof(inBuffer)); - - if(readbytes == 0) - { - std::cerr << "Reached END of the stream!" << std::endl; - return 0; - } - if(readbytes < 0) - { - if(errno != EWOULDBLOCK && errno != EAGAIN) - RsErr() << "read() failed. Errno=" << errno ; - - return false; - } - - std::cerr << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes << std::endl; - - //::close(clintConnt); - - // display some debug info - - if(readbytes > 0) - { - RsDbg() << "Received the following bytes: " << RsUtil::BinToHex( reinterpret_cast(inBuffer),readbytes,50) << std::endl; - //RsDbg() << "Received the following bytes: " << std::string(inBuffer,readbytes) << std::endl; - - void *ptr = malloc(readbytes); - - if(!ptr) - throw std::runtime_error("Cannot allocate memory! Go buy some RAM!"); - - memcpy(ptr,inBuffer,readbytes); - - in_buffer.push_back(std::make_pair(ptr,readbytes)); - mTotalBufferBytes += readbytes; - mTotalReadBytes += readbytes; - - std::cerr << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalBufferBytes << std::endl ; - } - - return true; -} - -int FsBioInterface::readdata(void *data, int len) -{ - // read incoming bytes in the buffer - - int total_len = 0; - - while(total_len < len) - { - if(in_buffer.empty()) - { - mTotalBufferBytes -= total_len; - return total_len; - } - - // If the remaining buffer is too large, chop of the beginning of it. - - if(total_len + in_buffer.front().second > len) - { - memcpy(&(static_cast(data)[total_len]),in_buffer.front().first,len - total_len); - - void *ptr = malloc(in_buffer.front().second - (len - total_len)); - memcpy(ptr,&(static_cast(in_buffer.front().first)[len - total_len]),in_buffer.front().second - (len - total_len)); - - free(in_buffer.front().first); - in_buffer.front().first = ptr; - in_buffer.front().second -= len-total_len; - - mTotalBufferBytes -= len; - return len; - } - else // copy everything - { - memcpy(&(static_cast(data)[total_len]),in_buffer.front().first,in_buffer.front().second); - - total_len += in_buffer.front().second; - - free(in_buffer.front().first); - in_buffer.pop_front(); - } - } - mTotalBufferBytes -= len; - return len; -} - -int FsBioInterface::senddata(void *data, int len) -{ -// int written = write(mCLintConnt, data, len); -// return written; - return len; -} -int FsBioInterface::netstatus() -{ - return 1; // dummy response. -} -int FsBioInterface::isactive() -{ - return mCLintConnt > 0; -} -bool FsBioInterface::moretoread(uint32_t /* usec */) -{ - return mTotalBufferBytes > 0; -} -bool FsBioInterface::cansend(uint32_t) -{ - return isactive(); -} - -int FsBioInterface::close() -{ - RsDbg() << "Stopping network interface" << std::endl; - return 1; -} - - -FsClient::FsClient(const std::string& address) - : mServerAddress(address) -{ -} - -bool FsClient::sendItem(RsItem *item) -{ - // open a connection - - int CreateSocket = 0,n = 0; - char dataReceived[1024]; - struct sockaddr_in ipOfServer; - - memset(dataReceived, '0' ,sizeof(dataReceived)); - - if((CreateSocket = socket(AF_INET, SOCK_STREAM, 0))< 0) - { - printf("Socket not created \n"); - return 1; - } - - ipOfServer.sin_family = AF_INET; - ipOfServer.sin_port = htons(2017); - ipOfServer.sin_addr.s_addr = inet_addr("127.0.0.1"); - - if(connect(CreateSocket, (struct sockaddr *)&ipOfServer, sizeof(ipOfServer))<0) - { - printf("Connection failed due to port and ip problems, or server is not available\n"); - return false; - } - - // Serialise the item and send it. - - uint32_t size = RsSerialiser::MAX_SERIAL_SIZE; - RsTemporaryMemory data(size); - - if(!data) - { - RsErr() << "Cannot allocate memory to send item!" << std::endl; - return false; - } - - FsSerializer *fss = new FsSerializer; - RsSerialiser rss; - rss.addSerialType(fss); - - FsSerializer().serialise(item,data,&size); - - write(CreateSocket,data,size); - - // Now attempt to read and deserialize anything that comes back from that connexion - - FsBioInterface bio(CreateSocket); - pqistreamer pqi(&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE); - pqithreadstreamer p(&pqi,&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE); - p.start(); - - while(true) - { - RsItem *item = p.GetItem(); - - if(!item) - { - rstime::rs_usleep(1000*200); - continue; - } - - std::cerr << "Got a response item: " << std::endl; - std::cerr << *item << std::endl; - } - - return 0; - - // if ok, stream the item through it -} diff --git a/retroshare-friendserver/src/network.h b/retroshare-friendserver/src/network.h index 37ced7624..e5ff4bc3c 100644 --- a/retroshare-friendserver/src/network.h +++ b/retroshare-friendserver/src/network.h @@ -33,41 +33,6 @@ struct ConnectionData pqistreamer *pqi; }; -class FsBioInterface: public BinInterface -{ -public: - FsBioInterface(int socket); - - // Implements BinInterface methods - - int tick() override; - - int senddata(void *data, int len) override; - int readdata(void *data, int len) override; - - int netstatus() override; - int isactive() override; - bool moretoread(uint32_t usec) override; - bool cansend(uint32_t usec) override; - - int close() override; - - /** - * If hashing data - **/ - RsFileHash gethash() override { return RsFileHash() ; } - uint64_t bytecount() override { return mTotalReadBytes; } - - bool bandwidthLimited() override { return false; } - -private: - int mCLintConnt; - uint32_t mTotalReadBytes; - uint32_t mTotalBufferBytes; - - std::list > in_buffer; -}; - // This class handles multiple connections to the server and supplies RsItem elements class FsNetworkInterface: public RsTickingThread @@ -97,19 +62,6 @@ private: std::map mConnections; }; -// This class runs a client connection to the friend server. It opens a socket at each connection. - -class FsClient -{ -public: - FsClient(const std::string& address); - - bool sendItem(RsItem *item); - -private: - std::string mServerAddress; -}; -