diff --git a/libretroshare/src/friend_server/fsbio.cc b/libretroshare/src/friend_server/fsbio.cc index 517738225..ecff610ea 100644 --- a/libretroshare/src/friend_server/fsbio.cc +++ b/libretroshare/src/friend_server/fsbio.cc @@ -24,7 +24,7 @@ #include "fsbio.h" FsBioInterface::FsBioInterface(int socket) - : mCLintConnt(socket) + : mCLintConnt(socket),mIsActive(true) { mTotalReadBytes=0; mTotalBufferBytes=0; @@ -43,21 +43,22 @@ int FsBioInterface::tick() if(readbytes == 0) { - std::cerr << "Reached END of the stream!" << std::endl; - return 0; + RsDbg() << "Reached END of the stream!" << std::endl; + RsDbg() << "Closing!" << std::endl; + + mIsActive = false; + return mTotalBufferBytes; } if(readbytes < 0) { if(errno != EWOULDBLOCK && errno != EAGAIN) RsErr() << "read() failed. Errno=" << errno ; - return false; + return mTotalBufferBytes; } std::cerr << "clintConnt: " << mCLintConnt << ", readbytes: " << readbytes << std::endl; - //::close(clintConnt); - // display some debug info if(readbytes > 0) @@ -79,7 +80,7 @@ int FsBioInterface::tick() std::cerr << "Socket: " << mCLintConnt << ". Total read: " << mTotalReadBytes << ". Buffer size: " << mTotalBufferBytes << std::endl ; } - return true; + return mTotalBufferBytes; } int FsBioInterface::readdata(void *data, int len) @@ -134,7 +135,7 @@ int FsBioInterface::senddata(void *data, int len) } int FsBioInterface::netstatus() { - return 1; // dummy response. + return mIsActive; // dummy response. } int FsBioInterface::isactive() @@ -154,6 +155,7 @@ bool FsBioInterface::cansend(uint32_t) int FsBioInterface::close() { RsDbg() << "Stopping network interface" << std::endl; + mIsActive = false; return 1; } diff --git a/libretroshare/src/friend_server/fsbio.h b/libretroshare/src/friend_server/fsbio.h index 446ad29f8..6b2ed06bd 100644 --- a/libretroshare/src/friend_server/fsbio.h +++ b/libretroshare/src/friend_server/fsbio.h @@ -51,6 +51,7 @@ public: private: int mCLintConnt; + bool mIsActive; uint32_t mTotalReadBytes; uint32_t mTotalBufferBytes; diff --git a/libretroshare/src/friend_server/fsclient.cc b/libretroshare/src/friend_server/fsclient.cc index 2f486122f..377055505 100644 --- a/libretroshare/src/friend_server/fsclient.cc +++ b/libretroshare/src/friend_server/fsclient.cc @@ -117,7 +117,7 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st // TODO: we should write in multiple chunks just in case the socket is not fully ready write(CreateSocket,data,size); - // Now attempt to read and deserialize anything that comes back from that connexion + // Now attempt to read and deserialize anything that comes back from that connexion until it gets closed by the server. FsBioInterface bio(CreateSocket); pqistreamer pqi(&rss,RsPeerId(),&bio,BIN_FLAGS_READABLE); @@ -128,14 +128,26 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st { RsItem *item = p.GetItem(); - if(!item) + if(item) + { + response.push_back(item); + std::cerr << "Got a response item: " << std::endl; + std::cerr << *item << std::endl; + } + else { std::this_thread::sleep_for(std::chrono::milliseconds(200)); continue; } - std::cerr << "Got a response item: " << std::endl; - std::cerr << *item << std::endl; + if(!bio.isactive()) // socket has probably closed + { + RsDbg() << "(client side) Socket has been closed by server. Killing pqistreamer and closing socket." ; + p.fullstop(); + + close(CreateSocket); + CreateSocket=0; + } } return 0; diff --git a/retroshare-friendserver/src/friendserver.cc b/retroshare-friendserver/src/friendserver.cc index fa4591955..32d3ea8a2 100644 --- a/retroshare-friendserver/src/friendserver.cc +++ b/retroshare-friendserver/src/friendserver.cc @@ -37,7 +37,15 @@ void FriendServer::threadTick() void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *item) { - RsDbg() << "Received a client publish item:" << *item ; + RsDbg() << "Received a client publish item from " << item->PeerId() << ":" << *item ; + + // Respond with a list of potential friends + + // Close client connection from server side, to tell the client that nothing more is coming. + + RsDbg() << "Closing client connection." ; + + mni->closeConnection(item->PeerId()); } void FriendServer::handleClientRemove(const RsFriendServerClientRemoveItem *item) { diff --git a/retroshare-friendserver/src/network.cc b/retroshare-friendserver/src/network.cc index 5f697e9b0..33c26c297 100644 --- a/retroshare-friendserver/src/network.cc +++ b/retroshare-friendserver/src/network.cc @@ -41,12 +41,11 @@ #include "friend_server/fsitem.h" FsNetworkInterface::FsNetworkInterface() - : mFsNiMtx(std::string("FsNetworkInterface")) + : PQInterface(RsPeerId()),mFsNiMtx(std::string("FsNetworkInterface")) { RS_STACK_MUTEX(mFsNiMtx); mClintListn = 0; - mClintListn = socket(AF_INET, SOCK_STREAM, 0); // creating socket int flags = fcntl(mClintListn, F_GETFL); @@ -76,9 +75,10 @@ FsNetworkInterface::FsNetworkInterface() FsNetworkInterface::~FsNetworkInterface() { + RS_STACK_MUTEX(mFsNiMtx); for(auto& it:mConnections) { - delete it.second.pqi; + delete it.second.pqi_thread; std::cerr << "Releasing socket " << it.second.socket << std::endl; close(it.second.socket); } @@ -93,8 +93,11 @@ void FsNetworkInterface::threadTick() // 2 - tick all streamers + RS_STACK_MUTEX(mFsNiMtx); for(auto& it:mConnections) - it.second.pqi->tick(); + { + it.second.pqi_thread->tick(); + } rstime::rs_usleep(1000*200); } @@ -102,6 +105,8 @@ void FsNetworkInterface::threadTick() static RsPeerId makePeerId(int t) { unsigned char s[RsPeerId::SIZE_IN_BYTES]; + memset(s,0,sizeof(s)); + *reinterpret_cast(&s) = t; return RsPeerId::fromBufferUnsafe(s); } @@ -145,28 +150,110 @@ bool FsNetworkInterface::checkForNewConnections() FsBioInterface *bio = new FsBioInterface(clintConnt); - auto p = new pqistreamer(rss, pid, bio,BIN_FLAGS_READABLE | BIN_FLAGS_WRITEABLE); - auto pqi = new pqithreadstreamer(p,rss, pid, bio,BIN_FLAGS_READABLE | BIN_FLAGS_WRITEABLE); - c.pqi = pqi; + auto pqi = new pqithreadstreamer(this,rss, pid, bio,BIN_FLAGS_READABLE | BIN_FLAGS_WRITEABLE); + + c.pqi_thread = pqi; + c.bio = bio; pqi->start(); RS_STACK_MUTEX(mFsNiMtx); - mConnections[makePeerId(clintConnt)] = c; + mConnections[pid] = c; return true; } +bool FsNetworkInterface::RecvItem(RsItem *item) +{ + RS_STACK_MUTEX(mFsNiMtx); + + auto it = mConnections.find(item->PeerId()); + + if(it == mConnections.end()) + { + RsErr() << "Receiving an item for peer ID " << item->PeerId() << " but no connection is known for that peer." << std::endl; + delete item; + return false; + } + + it->second.incoming_items.push_back(item); + return true; +} + RsItem *FsNetworkInterface::GetItem() { RS_STACK_MUTEX(mFsNiMtx); for(auto& it:mConnections) { - RsItem *item = it.second.pqi->GetItem(); - if(item) + if(!it.second.incoming_items.empty()) + { + RsItem *item = it.second.incoming_items.front(); + it.second.incoming_items.pop_front(); + return item; + } } return nullptr; } +int FsNetworkInterface::SendItem(RsItem *item) +{ + RS_STACK_MUTEX(mFsNiMtx); + + const auto& it = mConnections.find(item->PeerId()); + + if(it == mConnections.end()) + { + RsErr() << "Cannot send item to peer " << item->PeerId() << ": no pending sockets available." ; + delete item; + return 0; + } + + return it->second.pqi_thread->SendItem(item); +} + +void FsNetworkInterface::closeConnection(const RsPeerId& peer_id) +{ + const auto& it = mConnections.find(peer_id); + + if(it == mConnections.end()) + { + RsErr() << "Cannot close connection to peer " << peer_id << ": no pending sockets available." ; + return; + } + + if(!it->second.incoming_items.empty()) + { + RsErr() << "Trying to close an incoming connection with incoming items still pending! The items will be lost." << std::endl; + + for(auto& item:it->second.incoming_items) + delete item; + + it->second.incoming_items.clear(); + } + // Close the socket and delete everything. + + it->second.pqi_thread->fullstop(); + it->second.bio->close(); + + close(it->second.socket); + + delete it->second.pqi_thread; + delete it->second.bio; +} + + + + + + + + + + + + + + + diff --git a/retroshare-friendserver/src/network.h b/retroshare-friendserver/src/network.h index e842ec77c..5729b515c 100644 --- a/retroshare-friendserver/src/network.h +++ b/retroshare-friendserver/src/network.h @@ -22,20 +22,25 @@ #pragma once #include "util/rsthreads.h" +#include "pqi/pqi_base.h" #include "retroshare/rspeers.h" -class pqistreamer; +class pqithreadstreamer; +class FsBioInterface; struct ConnectionData { sockaddr client_address; int socket; - pqistreamer *pqi; + pqithreadstreamer *pqi_thread; + FsBioInterface *bio; + + std::list incoming_items; }; // This class handles multiple connections to the server and supplies RsItem elements -class FsNetworkInterface: public RsTickingThread +class FsNetworkInterface: public RsTickingThread, public PQInterface { public: FsNetworkInterface() ; @@ -43,7 +48,13 @@ public: // basic functionality - RsItem *GetItem(); + void closeConnection(const RsPeerId& peer_id); + + // Implements PQInterface + + bool RecvItem(RsItem *item) override; + int SendItem(RsItem *item) override; + RsItem *GetItem() override; // Implements RsTickingThread